In [None]:
!pip install natasha

In [None]:
!pip install lexicalrichness

In [None]:
!pip install catboost

In [None]:
!pip install -U deep-translator

In [None]:
!pip install transformers

In [None]:
import pandas as pd
from natasha import Segmenter, Doc, NewsEmbedding, NewsMorphTagger, MorphVocab
from lexicalrichness import LexicalRichness
import numpy as np
from sklearn.model_selection import GridSearchCV
from sklearn.feature_selection import mutual_info_classif, RFECV, SelectKBest
from sklearn.model_selection import KFold
from sklearn.preprocessing import MinMaxScaler
from sklearn.pipeline import Pipeline

from sklearn.linear_model import LogisticRegression
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.svm import SVC
from sklearn.neighbors import KNeighborsClassifier

import lightgbm as lgb
import xgboost as xgb
from catboost import CatBoostClassifier

from transformers import T5ForConditionalGeneration, T5Tokenizer
from deep_translator import GoogleTranslator

In [None]:
class ParaphraseAugmentation():
    def __init__(self):
        MODEL_NAME = 'cointegrated/rut5-base-paraphraser'
        self.model = T5ForConditionalGeneration.from_pretrained(MODEL_NAME)
        self.tokenizer = T5Tokenizer.from_pretrained(MODEL_NAME)
        self.model.cuda()
        self.model.eval()
        self.segmenter = Segmenter()
        self.emb = NewsEmbedding()
        self.morph_tagger = NewsMorphTagger(self.emb)

    def paraphrase(self, text, beams=9, grams=3, do_sample=False):
        new_text = ''
        doc = Doc(text)
        doc.segment(self.segmenter)
        doc.tag_morph(self.morph_tagger)
        for sentence in doc.sents:
            sentence = sentence.text
            x = self.tokenizer(sentence, return_tensors='pt', padding=True).to(self.model.device)
            max_size = int(x.input_ids.shape[1] * 1.5 + 10)
            out = self.model.generate(**x, encoder_no_repeat_ngram_size=grams, num_beams=beams, max_length=max_size, do_sample=do_sample)
            pharaphrased =  self.tokenizer.decode(out[0], skip_special_tokens=True)
            new_text = new_text + ' ' + pharaphrased
        return new_text

    def augmentation(self, csv_file_path):
        dataframe = pd.read_csv(csv_file_path, sep=';', encoding="cp1251", header=1).dropna().reset_index(drop=True)
        count_row = dataframe.shape[0]
        for row in range(count_row):
            print(row)
            dataframe.loc[dataframe.shape[0], 'Text'] = self.paraphrase(dataframe.loc[row, 'Text'], beams=10, grams=7)
            dataframe.loc[dataframe.shape[0]-1, 'Style'] = dataframe.loc[row, 'Style']
        dataframe.to_csv('texts_augmented (paraphrase).csv', sep=';', encoding='cp1251', errors='ignore', index=False)

In [None]:
class TranslateAugmentation():
    def __init__(self):
        self.translator_en = GoogleTranslator(source='auto', target='en')
        self.translator_de = GoogleTranslator(source='auto', target='de')
        self.translator_ru = GoogleTranslator(source='auto', target='ru')

    def translate(self, text):
        translated_en = self.translator_en.translate(text)
        translated_de = self.translator_de.translate(translated_en)
        translated_ru = self.translator_ru.translate(translated_de)
        return translated_ru

    def augmentation(self, csv_file_path):
        dataframe = pd.read_csv(csv_file_path, sep=';', encoding="cp1251", header=1).dropna().reset_index(drop=True)
        count_row = dataframe.shape[0]
        for row in range(count_row):
            print(row)
            try:
                new_text = self.translate(dataframe.loc[row, 'Text'])
            except Exception as e:
                continue
            dataframe.loc[dataframe.shape[0], 'Text'] = new_text
            dataframe.loc[dataframe.shape[0]-1, 'Style'] = dataframe.loc[row, 'Style']
        dataframe.to_csv('texts_augmented (translation).csv', sep=';', encoding='cp1251', errors='ignore', index=False)

In [None]:
translator = TranslateAugmentation()

In [None]:
translator.augmentation('texts.csv')

In [None]:
paraphraser = ParaphraseAugmentation()

In [None]:
paraphraser.augmentation('texts.csv')

In [None]:
class FeatureExtractionModule():
  def __init__(self):
    self.segmenter = Segmenter()
    self.emb = NewsEmbedding()
    self.morph_tagger = NewsMorphTagger(self.emb)
    self.morph_vocab = MorphVocab()

  # Преобразование набора данных текстов в набор данных векторов признаков
  def fromCsvToCsv(self, csv_file_path):
    if(csv_file_path!='texts.csv'):
        dataframe = pd.read_csv(csv_file_path, sep=';', encoding="cp1251")
    else:
        dataframe = pd.read_csv(csv_file_path, sep=';', encoding="cp1251", header=1).dropna().reset_index(drop=True)
    dataframe.loc[:, "Style"] = dataframe.loc[:, "Style"].astype('category').cat.codes # Style from string to int categories
    dataframe_rows = dataframe.shape[0]

    dataset = pd.DataFrame()
    for csv_row in range(dataframe_rows):
        text = dataframe.loc[csv_row, "Text"]
        doc = Doc(text)
        doc.segment(self.segmenter)
        doc.tag_morph(self.morph_tagger)

        dataset.loc[csv_row, 'average_word_length'] = self.averageWordLength(doc)
        dataset.loc[csv_row, 'average_word_count'] = self.averageWordCount(doc)
        dataset.loc[csv_row, 'log_connection'] = self.logicalConnectionCoefficient(doc)

        dataset.loc[csv_row, 'verb_freq'] = self.posFrequencyCoefficient(doc, 'VERB')
        dataset.loc[csv_row, 'noun_freq'] = self.posFrequencyCoefficient(doc, 'NOUN')
        dataset.loc[csv_row, 'adv_freq'] = self.posFrequencyCoefficient(doc, 'ADV')
        dataset.loc[csv_row, 'adj_freq'] = self.posFrequencyCoefficient(doc, 'ADJ')
        dataset.loc[csv_row, 'propn_freq'] = self.posFrequencyCoefficient(doc, 'PROPN')

        dataset.loc[csv_row, 'verb_noun_freq']  = self.posCombinationFrequencyCoefficient(doc, 'VERB', 'NOUN')
        dataset.loc[csv_row, 'verb_adv_freq']  = self.posCombinationFrequencyCoefficient(doc, 'VERB', 'ADV')
        dataset.loc[csv_row, 'noun_noun_freq']  = self.posCombinationFrequencyCoefficient(doc, 'NOUN', 'NOUN')
        dataset.loc[csv_row, 'adv_noun_freq']  = self.posCombinationFrequencyCoefficient(doc, 'ADJ', 'NOUN')

        dataset.loc[csv_row, 'dinamism_static'] = self.dynamismStaticTextCoefficient(doc)

        dataset.loc[csv_row, 'dot_freq'] = self.punctFrequencyCoefficient(doc, '.')
        dataset.loc[csv_row, 'comma_freq'] = self.punctFrequencyCoefficient(doc, ',')
        dataset.loc[csv_row, 'colon_freq'] = self.punctFrequencyCoefficient(doc, ':')
        dataset.loc[csv_row, 'semicolon_freq'] = self.punctFrequencyCoefficient(doc, ';')
        dataset.loc[csv_row, 'quote_freq'] = self.punctFrequencyCoefficient(doc, '"')
        dataset.loc[csv_row, 'exclamation_freq'] = self.punctFrequencyCoefficient(doc, '!')
        dataset.loc[csv_row, 'question_freq'] = self.punctFrequencyCoefficient(doc, '?')
        dataset.loc[csv_row, 'dash_freq'] = self.punctFrequencyCoefficient(doc, '—')

        dataset.loc[csv_row, 'lex_rich'] = self.lexicalRichnessCoefficient(doc)

        dataset.loc[csv_row, 'y (style)'] = dataframe.loc[csv_row, 'Style']
    return dataset

  # Преобразование 1 текста (для предсказания) в вектор признаков
  def fromTextToVector(self, text):
        doc = Doc(text)
        doc.segment(self.segmenter)
        doc.tag_morph(self.morph_tagger)

        average_word_length = self.averageWordLength(doc)
        average_word_count = self.averageWordCount(doc)
        log_connection = self.logicalConnectionCoefficient(doc)

        verb_freq = self.posFrequencyCoefficient(doc, 'VERB')
        noun_freq = self.posFrequencyCoefficient(doc, 'NOUN')
        adv_freq = self.posFrequencyCoefficient(doc, 'ADV')
        adj_freq = self.posFrequencyCoefficient(doc, 'ADJ')
        propn_freq = self.posFrequencyCoefficient(doc, 'PROPN')

        verb_noun_freq  = self.posCombinationFrequencyCoefficient(doc, 'VERB', 'NOUN')
        verb_adv_freq = self.posCombinationFrequencyCoefficient(doc, 'VERB', 'ADV')
        noun_noun_freq  = self.posCombinationFrequencyCoefficient(doc, 'NOUN', 'NOUN')
        adv_noun_freq  = self.posCombinationFrequencyCoefficient(doc, 'ADJ', 'NOUN')

        dinamism_static= self.dynamismStaticTextCoefficient(doc)

        dot_freq = self.punctFrequencyCoefficient(doc, '.')
        comma_freq = self.punctFrequencyCoefficient(doc, ',')
        colon_freq = self.punctFrequencyCoefficient(doc, ':')
        semicolon_freq = self.punctFrequencyCoefficient(doc, ';')
        quote_freq = self.punctFrequencyCoefficient(doc, '"')
        exclamation_freq = self.punctFrequencyCoefficient(doc, '!')
        question_freq = self.punctFrequencyCoefficient(doc, '?')
        dash_freq = self.punctFrequencyCoefficient(doc, '—')

        lex_rich = self.lexicalRichnessCoefficient(doc)

        return np.array([average_word_length, average_word_count, log_connection, verb_freq, noun_freq,
                         adv_freq, adj_freq, propn_freq, verb_noun_freq, verb_adv_freq, noun_noun_freq,
                         adv_noun_freq, dinamism_static, dot_freq, comma_freq, colon_freq, semicolon_freq,
                         quote_freq, exclamation_freq, question_freq, dash_freq, lex_rich])

  # Подсчёт количества слов в тексте (исключая знаки препинания и цифры и ошибочные слова)
  def wordCount(self, doc):
    token_count = 0
    for token in doc.tokens:
      if (token.pos != 'PUNCT' and token.pos != 'X' and token.pos !='NUM'):
        token_count = token_count + 1
    return token_count

  # Подсчёт количества препинаний в тексте
  def punctCount(self, doc):
    punct_count = 0
    for token in doc.tokens:
      if (token.pos == 'PUNCT'):
        punct_count = punct_count + 1
    return punct_count

  # Показатель среднего размера токена
  def averageWordLength(self, doc):
    token_count = self.wordCount(doc)
    token_len = []
    for token in doc.tokens:
      if (token.pos != 'PUNCT' and token.pos != 'X' and token.pos !='NUM'):
        token_len.append(len(token.text))
    return np.mean(token_len)

  # Показатель среднего размера предложения
  def averageWordCount(self, doc):
    sent_count = len(doc.sents)
    token_count = self.wordCount(doc)
    return token_count/sent_count

  # Коэффицент частотности части речи
  def posFrequencyCoefficient(self, doc, pos_tag):
    all_token_count = self.wordCount(doc)
    pos_count = 0
    for token in doc.tokens:
      if (token.pos == pos_tag):
        pos_count = pos_count + 1
    return pos_count/all_token_count

  # Коэффицент количества частиречной сочетаемости
  def posCombinationFrequencyCoefficient(self, doc, pos_tag_1, pos_tag_2):
    all_token_count = self.wordCount(doc)
    pos_count = 0
    for i in range(1, len(doc.tokens)):
      if (doc.tokens[i].pos == pos_tag_1 and doc.tokens[i-1].pos == pos_tag_2):
        pos_count = pos_count + 1
    return pos_count/(all_token_count-1)

  # Частота встречаемости знаков препинания  (точка, запятая, двоеточие, точка с запятой, кавычки, скобки, вопросительный знак и тире)
  def punctFrequencyCoefficient(self, doc, punct_type):
    all_punct_count = self.punctCount(doc)
    punct_count = 0
    for token in doc.tokens:
      if(token.text == punct_type):
        punct_count = punct_count + 1
    return punct_count/all_punct_count

  # Коэффицент соотношения динамичности и статичности текста.
  def dynamismStaticTextCoefficient(self, doc):
    verb_noun = self.posCombinationFrequencyCoefficient(doc, 'VERB', 'NOUN')
    verb_adv = self.posCombinationFrequencyCoefficient(doc, 'VERB', 'ADV')
    noun_noun = self.posCombinationFrequencyCoefficient(doc, 'NOUN', 'NOUN')
    adv_noun = self.posCombinationFrequencyCoefficient(doc, 'ADJ', 'NOUN')
    if(noun_noun + adv_noun != 0):
      return (verb_noun + verb_adv) / (noun_noun + adv_noun)
    else:
      return 0

  # Коэффицент логичной связности
  def logicalConnectionCoefficient(self, doc):
    all_token_count = self.wordCount(doc)
    service_word_count = 0
    for token in doc.tokens:
      if (token.pos == 'ADP' or  token.pos == 'PART' or token.pos == 'CONJ' or token.pos == 'CCONJ' or token.pos == 'INTJ' or token.pos == 'SCONJ'):
        service_word_count = service_word_count + 1
    return service_word_count/(3*all_token_count)

  # Коэффицент лексического богатсва
  def lexicalRichnessCoefficient(self, doc, method='voc-D'):
    list_of_tokens = []
    for token in doc.tokens:
        token.lemmatize(self.morph_vocab)
        list_of_tokens.append(token.lemma)
    lex = LexicalRichness(list_of_tokens, preprocessor=None, tokenizer=None)
    if(method == 'TTR'):
      return lex.ttr
    if(method == 'RTTR'):
      return lex.rttr
    if(method == 'CTTR'):
      return lex.cttr
    if(method == 'MSTTR'):
      return lex.msttr(segment_window=25)
    if(method == 'MATTR'):
      return lex.mattr(window_size=25)
    if(method == 'MTLD'):
      return lex.mtld(threshold=0.72)
    if(method == 'HD-D'):
      return lex.hdd(draws=42)
    if(method == 'voc-D'):
      return lex.vocd(ntokens=40, within_sample=100, iterations=3)
    if(method == 'Herdan'):
      return lex.Herdan
    if(method == 'Summer'):
      return lex.Summer
    if(method == 'Dugast'):
      return lex.Dugast
    if(method == 'Maas'):
      return lex.Maas
    if(method == 'YuleK'):
      return lex.yulek
    if(method == 'YuleI'):
      return lex.yulei
    if(method == 'HerdanVm'):
      return lex.herdanvm
    if(method == 'SimpsonD'):
      return lex.simpsond

In [None]:
feature_extraction = FeatureExtractionModule()
dataset = feature_extraction.fromCsvToCsv('texts_augmented (translation).csv')

In [None]:
dataset

In [None]:
X_data = dataset.iloc[:, 0:-1]
y_data = dataset.iloc[:, -1]

In [None]:
def plot_and_print_mi_scores(X_data, y_data):
  mi_scores = mutual_info_classif(X_data, y_data)
  mi_scores = pd.Series(mi_scores, name="MI Scores", index=X_data.columns)
  mi_scores = mi_scores.sort_values(ascending=True)
  ax = mi_scores.plot(kind='barh', figsize=(12, 8), title='Mutual Information Scores', legend=False)
  ax.bar_label(ax.containers[0], label_type='edge')
  return mi_scores.index[::-1]

In [None]:
plot_and_print_mi_scores(X_data, y_data)

In [None]:
scaler = MinMaxScaler()
scaler.fit(X_data)
X_data.iloc[:, :] = scaler.transform(X_data)

In [None]:
X = X_data.values
y = y_data.values

In [None]:
class BestEstimatorExtraction():
  def __init__(self,
               X_data,
               y_data):
    self.X = X_data
    self.y = y_data

    # Создание моделей для обучения
    self.log_reg_model = LogisticRegression()
    self.log_reg_best_parameters_for_all_features = {'C': [50], 'max_iter': [1000], 'penalty': ['l1'], 'random_state': [42], 'solver': ['saga']}
    self.log_reg_best_parameters = {'C': [50], 'max_iter': [100], 'penalty': ['l2'], 'random_state': [42], 'solver': ['sag']}
    self.log_features = [True, True, True, False, True, True, False, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True]
    self.log_reg_search_parameters = {"estimator__max_iter":[100, 500, 1000, 5000],
                                      "estimator__C":[0.0001, 0.0005, 0.001, 0.005, 0.01, 0.05, 0.1, 0.3, 0.5, 1, 5, 10, 50, 100],
                                      "estimator__penalty":["elasticnet", "l1", "l2"],
                                      "estimator__solver" : ['newton-cg', 'lbfgs', 'liblinear', 'sag', 'saga'],
                                      "estimator__random_state": [42]}

    self.dec_tree_model = DecisionTreeClassifier()
    self.dec_tree_best_parameters_for_all_features = {'ccp_alpha': [0.01], 'criterion': ['entropy'], 'max_depth': [8], 'max_features': [0.8], 'min_samples_leaf': [1], 'min_samples_split': [2], 'random_state': [42]}
    self.dec_tree_best_parameters = {'ccp_alpha': [0.0], 'criterion': ['entropy'], 'max_depth': [8], 'max_features': [0.8], 'min_samples_leaf': [5], 'min_samples_split': [2], 'random_state': [42]}
    self.dec_tree_features = [True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, False, True, True, True, True]
    self.dec_tree_search_parameters = {'estimator__max_features': ['sqrt', 'log2', 0.2, 0.4, 0.6, 0.8],
                                       'estimator__ccp_alpha': [0.1, .01, .001, .0],
                                       'estimator__min_samples_leaf': [1, 5, 8, 11],
                                       'estimator__min_samples_split': [2, 3, 5, 7, 9],
                                       'estimator__max_depth' : [None, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
                                       'estimator__criterion' :['gini', 'entropy', 'log_loss'],
                                       'estimator__random_state': [42]}

    self.random_forest_model = RandomForestClassifier()
    self.random_forest_best_parameters_for_all_features = {'ccp_alpha': [0.001], 'criterion': ['entropy'], 'max_depth': [None], 'max_features': ['sqrt'], 'min_samples_leaf': [1], 'min_samples_split': [2], 'n_estimators': [1000], 'random_state': [42]}
    self.random_forest_best_parameters = {'ccp_alpha': [0.001], 'criterion': ['entropy'], 'max_depth': [9], 'max_features': ['sqrt'], 'min_samples_leaf': [1], 'min_samples_split': [2], 'n_estimators': [500], 'random_state': [42]}
    self.random_forest_features = [True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True]
    self.random_forest_search_parameters = {'estimator__n_estimators':[100, 250, 500, 1000],
                                            'estimator__max_features': ['sqrt', 'log2', 0.2, 0.4, 0.6, 0.8],
                                            'estimator__ccp_alpha': [0.1, .01, .001, 0.0],
                                            'estimator__min_samples_leaf': [1, 5, 8, 11],
                                            'estimator__min_samples_split': [2, 3, 5, 7, 9],
                                            'estimator__max_depth' : [None, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
                                            'estimator__criterion' :['gini', 'entropy', 'log_loss'],
                                            'estimator__random_state': [42]}

    self.svc_model = SVC()
    self.svc_best_parameters_for_all_features = {'C': [10], 'gamma': ['scale'], 'kernel': ['rbf'], 'random_state': [42]}
    self.svc_best_parameters = {'C': [10], 'gamma': ['scale'], 'kernel': ['linear'], 'random_state': [42]}
    self.svc_features = [True, True, True, True, True, True, False, True, True, False, True, True, False, False, True, True, True, True, True, True, True, True]
    self.svc_search_parameters = {'estimator__C': [0.1, 1, 10, 100, 1000],
                                  'estimator__gamma': ['scale', 'auto', 1, 0.1, 0.01, 0.001, 0.0001],
                                  'estimator__kernel': ['linear', 'poly', 'rbf', 'sigmoid'],
                                  'estimator__random_state': [42]}

    self.knn_model = KNeighborsClassifier()
    self.knn_best_parameters = {'n_neighbors': [17], 'p': [1], 'weights': ['distance']}
    self.knn_features = [True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True]

    self.xgboost_model = xgb.XGBClassifier(objective='multi:softmax', num_class=5, tree_method='hist')
    self.xgboost_best_parameters = {'booster': ['gbtree'], 'n_estimators': [500], 'learning_rate': [0.05], 'max_depth': [4], 'subsample': [0.4], 'reg_alpha': [0.1], 'reg_lambda': [1.0], "random_state": [42]}
    self.xboost_features = [True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True]
    self.xgboost_search_parameters = {'estimator__booster' : ['gbtree'],
                                      'estimator__n_estimators':[100, 250, 500, 1000],
                                      'estimator__learning_rate' : [0.05, 0.1, 0.2, 0.3],
                                      'estimator__max_depth' : [None, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
                                      'estimator__subsample' : [0.4, 0.5, 0.6, 0.7, 0.8, 0.9],
                                      'estimator__reg_alpha' : [0.0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1],
                                      'estimator__reg_lambda' : [0.0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1],
                                      'estimator__random_state': [42]
                                      }

    self.catboost_model = CatBoostClassifier(verbose=False)
    self.catboost_best_parameters = {'n_estimators':[1000], 'learning_rate' : [0.2], 'max_depth' : [7], 'l2_leaf_reg':[3], 'random_strength': [0.2],'bagging_temperature':[1.0], 'border_count':[254], 'random_state': [42]}
    self.catboost_features = [True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True]
    self.catboost_search_parameters = {'n_estimators':[100, 250, 500, 1000],
                                      'learning_rate' : [0.05, 0.1, 0.2, 0.3],
                                      'max_depth' : [None, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
                                      'l2_leaf_reg':[1, 3, 5, 10, 100],
                                      'random_strength': [0.2, 0.5, 0.8, 1.1, 1.4],
                                      'bagging_temperature':[0.03, 0.09, 0.25, 0.75, 1.0],
                                      'border_count':[254],
                                      'random_state': [42]}

    self.lgbm_model = lgb.LGBMClassifier(objective='multiclass', n_jobs=-1)
    self.lgbm_best_parameters = {'n_estimators': [250],'max_depth': [None],'learning_rate' : [0.3],'subsample' : [0.4],'reg_alpha' : [0.1],'reg_lambda' : [0.1],'num_leaves': [32],'is_unbalance': [False],'boost_from_average': [False], "random_state": [42]}
    self.lgbm_features = [True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True]
    self.lgbm_search_parameters = {'estimator__n_estimators': [100, 250, 500, 1000],
                                   'estimator__max_depth': [None, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
                                   'estimator__learning_rate' : [0.05, 0.1, 0.2, 0.3],
                                   'estimator__subsample' : [0.4, 0.5, 0.6, 0.7, 0.8, 0.9],
                                   'estimator__reg_alpha' : [0.0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1],
                                   'estimator__reg_lambda' : [0.0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1],
                                   'estimator__num_leaves': [32],
                                   'estimator__is_unbalance': [False],
                                   'estimator__boost_from_average': [False],
                                   "estimator__random_state": [42]}

  # Методы для поиска оптимального количества признаков и параметров RFECV + GridSearch
  def modelGridSearch(self, model, parameters):
    cv_method = KFold(n_splits=5, shuffle=True, random_state=42)
    rfecv = RFECV(estimator=model, step=1, cv=cv_method, scoring='accuracy')
    clf = GridSearchCV(rfecv, parameters, scoring = 'accuracy', cv=cv_method, verbose=0)
    clf_results = clf.fit(self.X, self.y)
    return clf_results

  def logisticRegressionGridSearch(self):
      return self.modelGridSearch(self.log_reg_model, self.log_reg_search_parameters)

  def decisionTreeGridSearch(self):
      return self.modelGridSearch(self.dec_tree_model, self.dec_tree_search_parameters)

  def randomForestGridSearch(self):
      return self.modelGridSearch(self.random_forest_model, self.random_forest_search_parameters)

  def svcGridSearch(self):
      return self.modelGridSearch(self.svc_model, self.svc_search_parameters)

  def xgboostGridSearch(self):
      return self.modelGridSearch(self.xgboost_model, self.xgboost_search_parameters)

  def catBoostGridSearch(self):
      return self.modelGridSearch(self.catboost_model, self.catboost_search_parameters)

  def lgbmGridSearch(self):
      return self.modelGridSearch(self.lgbm_model, self.lgbm_search_parameters)

  def knnGridSearch(self):
      pipe = Pipeline([('selector', SelectKBest(mutual_info_classif, k=5)),
                     ('estimator', KNeighborsClassifier())])
      parameters = {'selector__k': np.arange(1,23),
                    'estimator__n_neighbors': np.arange(1,50),
                    'estimator__weights' : ['uniform', 'distance'],
                    'estimator__p' : [1, 2]}
      cv_method = KFold(n_splits=5, shuffle=True, random_state=42)
      clf = GridSearchCV(pipe, parameters, scoring = 'accuracy', cv=cv_method, verbose=0)
      clf_results = clf.fit(X, y)
      return clf_results

  # Методы для обучения с уже найденными оптимальным количеством признаков и параметрами
  def modelFit(self, model, parameters, features):
    cv_method = KFold(n_splits=5, shuffle=True, random_state=42)
    clf = GridSearchCV(model, parameters, scoring = 'accuracy', cv=cv_method, verbose=0)
    clf_results = clf.fit(self.X[:, features], self.y)
    print('Best accuracy score of : ' + model.__class__.__name__ + " " + str(clf_results.best_score_))
    return clf_results.best_estimator_, features

  def logisticRegressionFit(self):
      return self.modelFit(self.log_reg_model, self.log_reg_best_parameters, self.log_features)

  def decisionTreeFit(self):
      return self.modelFit(self.dec_tree_model, self.dec_tree_best_parameters, self.dec_tree_features)

  def randomForestFit(self):
      return self.modelFit(self.random_forest_model, self.random_forest_best_parameters, self.random_forest_features)

  def svcFit(self):
      return self.modelFit(self.svc_model, self.svc_best_parameters, self.svc_features)

  def knnFit(self):
      return self.modelFit(self.knn_model, self.knn_best_parameters, self.knn_features)

  def xgboostFit(self):
      return self.modelFit(self.xgboost_model, self.xgboost_best_parameters, self.xboost_features)

  def catBoostFit(self):
      return self.modelFit(self.catboost_model, self.catboost_best_parameters, self.catboost_features)

  def lgbmFit(self):
      return self.modelFit(self.lgbm_model, self.lgbm_best_parameters, self.lgbm_features)

  def modelPredict(self, model, text, feature_extraction, scaler, features):
    text_features = feature_extraction.fromTextToVector(text)[None, :]
    text_features = scaler.transform(text_features)
    text_features = text_features[:, features]
    proba = model.predict_proba(text_features)
    return proba

In [None]:
bee = BestEstimatorExtraction(X, y)

In [None]:
log_reg, log_reg_features = bee.logisticRegressionFit()
dec_tree, dec_tree_features  = bee.decisionTreeFit()
random_forest, random_forest_features = bee.randomForestFit()
svc, svc_features = bee.svcFit()
knn, knn_features = bee.knnFit()
xgboost, xgboost_features = bee.xgboostFit()
catboost, catboost_features = bee.catBoostFit()
lgbm, lgbm_features = bee.lgbmFit()



Best accuracy score of : LogisticRegression 0.9016082920870014
Best accuracy score of : DecisionTreeClassifier 0.8318232767849271
Best accuracy score of : RandomForestClassifier 0.9156610118260474
Best accuracy score of : SVC 0.9041195954710897
Best accuracy score of : KNeighborsClassifier 0.9036044886084558
Best accuracy score of : XGBClassifier 0.9241961688139948
Best accuracy score of : CatBoostClassifier 0.9342350852004383
Best accuracy score of : LGBMClassifier 0.9292149972922255


Saving models

In [None]:
import joblib

joblib.dump(log_reg, 'log_reg.pkl')
joblib.dump(dec_tree, 'dec_tree.pkl')
joblib.dump(random_forest, 'random_forest.pkl')
joblib.dump(svc, 'svc.pkl')
joblib.dump(knn, 'knn.pkl')
joblib.dump(xgboost, 'xgboost.pkl')
joblib.dump(catboost, 'catboost.pkl')
joblib.dump(lgbm, 'lgbm.pkl')

joblib.dump(scaler, 'scaler.pkl')

###**Neural Network**

In [None]:
!pip install lightning

In [None]:
!pip install optuna

In [None]:
import lightning.pytorch as pl
from torch.utils.data import TensorDataset, random_split, DataLoader
from torch import nn
import torch
import optuna
from optuna.integration import PyTorchLightningPruningCallback
from lightning.pytorch.callbacks.early_stopping import EarlyStopping
import torchmetrics
from lightning.pytorch.callbacks import Callback, ModelCheckpoint

In [None]:
feature_extraction = FeatureExtractionModule()
dataset = feature_extraction.fromCsvToCsv('texts_augmented (translation).csv')

In [None]:
X_data = dataset.iloc[:, 0:-1]
y_data = dataset.iloc[:, -1]

In [None]:
# mutual information
mi_scores = mutual_info_classif(X_data, y_data)
mi_scores = pd.Series(mi_scores, name="MI Scores", index=X_data.columns)
mi_scores = mi_scores.sort_values(ascending=True)
feature_important_list = mi_scores.index[::-1]
features_indexes = [list(X_data.columns).index(feature_important_list[i]) for i in range(len(feature_important_list))]

In [None]:
X_data = X_data.reindex(feature_important_list, axis=1)

In [None]:
scaler = MinMaxScaler()
scaler.fit(X_data)
X_data.iloc[:, :] = scaler.transform(X_data)

In [None]:
X = X_data.values
y = y_data.values

In [None]:
class MLPSearch(nn.Module):
    def __init__(self,
                 in_features,
                 num_classes,
                 n_layers_out_features,
                 dropout):
        super().__init__()
        layers = []

        input_dim = in_features
        for output_dim in n_layers_out_features:
            layers.append(nn.Linear(input_dim, output_dim))
            layers.append(nn.ReLU())
            layers.append(nn.Dropout(dropout))
            input_dim = output_dim

        layers.append(nn.Linear(input_dim, num_classes))

        self.layers = nn.Sequential(*layers)

    def forward(self, x):
        return self.layers(x)

In [None]:
class ModelCompilation(pl.LightningModule):
    def __init__(self,
                 model:torch.nn.Module,
                 metrics:dict,
                 loss_function,
                 optimizer:torch.optim,
                 learning_rate:float):
        super().__init__()
        self.model = model
        self.metrics = metrics
        self.loss_function = loss_function
        self.optimizer = optimizer
        self.learning_rate = learning_rate
        self.save_hyperparameters(logger=False)

    def forward(self, x):
        pred = self.model.forward(x)
        return pred

    def configure_optimizers(self):
        train_optimizer = self.optimizer(self.parameters(), lr=self.learning_rate)
        return train_optimizer

    def training_step(self, batch, batch_idx):
        loss, pred, y = self.common_step(batch, batch_idx, 'train')
        return loss

    def validation_step(self, batch, batch_idx):
        loss, pred, y = self.common_step(batch, batch_idx, 'val')
        return loss

    def test_step(self, batch, batch_idx):
        loss, pred, y = self.common_step(batch, batch_idx, 'test')
        return loss

    def common_step(self, batch, batch_idx, stage):
        x, y = batch
        pred = self.forward(x)
        loss = self.loss_function(pred, y)
        if (stage == 'test') or (stage == 'val'):
            on_step = False
        else:
            on_step = True

        [self.log(stage + '_' + metric_name, metric(pred, y), on_step=on_step, on_epoch=True, prog_bar=True, logger=True) for metric_name, metric in self.metrics.items()]
        self.log(stage + '_' + 'loss', loss, on_step=on_step, on_epoch=True, prog_bar=True, logger=True)
        return loss, pred, y

In [None]:
def objective(trial: optuna.trial.Trial):
  num_classes = 5
  task = 'multiclass'
  batch_size = 128
  metrics = {'accuracy': torchmetrics.Accuracy(task=task, num_classes=num_classes)}
  loss_function = nn.CrossEntropyLoss()
  optimizer = torch.optim.Adam

  in_features = trial.suggest_int("n_best_features", 1, 22)
  learning_rate = trial.suggest_float("learning_rate", 1e-5, 1e-2)
  n_layers = trial.suggest_int("n_layers", 1, 4)
  dropout = trial.suggest_float("dropout", 0.1, 0.7)
  output_dims = [
      trial.suggest_int("n_units_l_{}".format(i), 64, 512, log=True) for i in range(n_layers)
  ]

  tensor_dataset = TensorDataset(torch.tensor(X[:, 0:in_features]).float(), torch.tensor(y).long())

  n_splits = 5
  cv_method = KFold(n_splits=n_splits, shuffle=True, random_state=42)
  kfold_losses = []

  for fold, (train_ids, test_ids) in enumerate(cv_method.split(dataset)):

    train_data = torch.utils.data.Subset(tensor_dataset, train_ids)
    val_data = torch.utils.data.Subset(tensor_dataset, test_ids)
    train_dataloader = DataLoader(train_data, batch_size=batch_size, shuffle=True)
    val_dataloader = DataLoader(val_data, batch_size=batch_size, shuffle=False)

    network = MLPSearch(in_features, num_classes, output_dims, dropout)
    model = ModelCompilation(network, metrics, loss_function, optimizer, learning_rate)

    checkpoint_callback = ModelCheckpoint(monitor='val_loss', filename='model-{epoch:02d}-{val_loss:.2f}-{val_accuracy:.2f}')
    early_stopping_callback = EarlyStopping(monitor="val_loss", mode="min", patience=10)

    trainer = pl.Trainer(callbacks=[early_stopping_callback, checkpoint_callback], precision='32', accelerator="cpu", devices="auto", max_epochs=150, enable_model_summary=False, enable_progress_bar=False)
    trainer.fit(model, train_dataloader, val_dataloader)

    kfold_losses.append(checkpoint_callback.best_model_score)

  return torch.tensor(np.mean(kfold_losses))

# Подбор гиперпараметров осуществляется TPESampler Tree-structured Parzen Estimator
def searchHyperparameters():
  pruner = optuna.pruners.ThresholdPruner(lower=0.03)
  study = optuna.create_study(direction="minimize", pruner=pruner)
  study.optimize(objective, n_trials=300, gc_after_trial=True, timeout=None)
  trial = study.best_trial
  print("Number of finished trials: {}".format(len(study.trials)))
  print("Best trial:")
  print("  Value: {}".format(trial.value))
  print("  Params: ")
  for key, value in trial.params.items():
    print("    {}: {}".format(key, value))
  return trial.params

In [None]:
model_params = searchHyperparameters()

In [None]:
def load_pretrained_model(model_params, check_point=None):
  num_classes = 5
  task = 'multiclass'
  metrics = {'accuracy': torchmetrics.Accuracy(task=task, num_classes=num_classes)}
  loss_function = nn.CrossEntropyLoss()
  optimizer = torch.optim.Adam

  num_classes = 5
  in_features = model_params['n_best_features']
  learning_rate = model_params['learning_rate']
  dropout = model_params['dropout']
  output_dims = []
  for key, value in model_params.items():
    if('n_units_l' in key):
      output_dims.append(value)

  network = MLPSearch(in_features, num_classes, output_dims, dropout)
  model = ModelCompilation(network, metrics, loss_function, optimizer, learning_rate)
  if(check_point):
    model = model.load_from_checkpoint(check_point)
  return model

In [None]:
def finalFit(model_params):
  model = load_pretrained_model(model_params)

  in_features = model_params['n_best_features']
  batch_size = 32
  tensor_dataset = TensorDataset(torch.tensor(X[:, features_indexes[0:in_features]]).float(), torch.tensor(y).long())
  train_size = int(0.8 * len(tensor_dataset))
  val_size = len(tensor_dataset) - train_size
  train_data, val_data = random_split(tensor_dataset, [train_size, val_size])

  train_dataloader = DataLoader(train_data, batch_size=batch_size, shuffle=True)
  val_dataloader = DataLoader(val_data, batch_size=batch_size, shuffle=False)

  checkpoint_callback = ModelCheckpoint(monitor='val_loss', filename='model-{epoch:02d}-{val_loss:.2f}-{val_accuracy:.2f}')
  early_stopping_callback = EarlyStopping(monitor="val_loss", mode="min", patience=20)

  trainer = pl.Trainer(callbacks=[early_stopping_callback, checkpoint_callback], precision='32', accelerator="cpu", devices="auto", max_epochs=150)
  trainer.fit(model, train_dataloader, val_dataloader)
  best_model = load_pretrained_model(model_params, checkpoint_callback.best_model_path)
  return best_model, checkpoint_callback.best_model_path

In [None]:
network, checkpoint = finalFit(model_params)

In [None]:
checkpoint

'/content/lightning_logs/version_1577/checkpoints/model-epoch=40-val_loss=0.11-val_accuracy=0.95.ckpt'

Saving MLP

In [None]:
from google.colab import files

files.download(checkpoint)

In [None]:
import pickle

with open('mlp_parameters.pkl', 'wb') as file:
    pickle.dump(model_params, file)

In [None]:
import joblib

joblib.dump(scaler, 'mlp_scaler.pkl')

['mlp_scaler.pkl']