In [None]:
import os
import re
import gc
import operator
import pickle
from copy import deepcopy
from statistics import mean
import warnings

import numpy as np

from xgboost import (XGBClassifier)

from sklearn.model_selection import train_test_split, StratifiedGroupKFold,StratifiedKFold
from sklearn.feature_extraction.text import TfidfVectorizer, CountVectorizer
from sklearn.metrics import accuracy_score, f1_score
from sklearn.manifold import TSNE

import tensorflow_hub as hub

import gensim
import gensim.models as g
import gensim.downloader

from spacy.tokenizer import Tokenizer
from spacy.lang.en import English
from sentence_transformers import SentenceTransformer

from transformers import BertTokenizer, TFBertModel, BertModel, AutoTokenizer, AutoModel, AutoModelForSequenceClassification
from transformers import TrainingArguments, Trainer
from datasets import load_metric
import torch


import optuna
from optuna.samplers import TPESampler

# Манипулирование данными
import pandas as pd # для манипулирования данными

# Визуализация
import plotly.express as px # для визуализации данных
import matplotlib.pyplot as plt # для отображения рукописных цифр

In [None]:
warnings.filterwarnings('ignore')
optuna.logging.set_verbosity(optuna.logging.WARNING)

In [None]:
def return_all_files_texts(prefix_file='./CO_fold/'):
    """
    Закачка документов в буфер памяти
    """
    file_names = os.listdir(prefix_file)
    text_file_list = []
    annotation_file_list = []
    for i in file_names:
        if '.txt' == i[-4:]:
            with open(prefix_file+i) as file:
                 text_file_list.append(file.read())
            with open(prefix_file+i[:-4]+'.ann') as file:
                 annotation_file_list.append(file.read())
    return text_file_list, annotation_file_list
def parse_ann_file(text):
    """
    Парсинг файлов анотации
    """
    text = list(filter(lambda x: x!='', text.split('\n')))
    text = [i.split('\t') for i in text]
    text = [[i[1].split(), i[2]] for i in text]
    text = [{'name': i[0][0], 'start': int(i[0][1]), 'end': int(i[0][-1]), 'text': i[1]} for i in text]
    return text 
def highlight_description(ann_text, raw_text):
    """
    Выделение текста операторскими скобками
    """
    filtered_list = list(filter(lambda x: x['name']=='Description', ann_text))
    filtered_list = sorted(filtered_list, key=lambda x: x['end'], reverse=True) 
    for i in filtered_list:
        if raw_text[i['end']]=='.':
            raw_text = raw_text[:i['end']]+'{__enddesc__}'+raw_text[i['end']:]
        else:
            k = 1
            while raw_text[i['end'] - k]==' ' or raw_text[i['end'] - k]=='\t':
                k += 1
            raw_text = raw_text[:i['end']-1]+'{__enddesc__}'+raw_text[i['end']-1:]
        raw_text = raw_text[:i['start']-1]+'{__startdesc__}'+raw_text[i['start']:]
    return raw_text
def replace_all(text, dic):
    """
    Множественная замена в строке
    """
    for i, j in dic.items():
        text = text.replace(i, j)
    return text
def soft_clean(text):
    """
    очистка текста от операторов, которые не влияют на предложения
    """
    reg = r'(\\)(\w+)(=\w+|\W*|(\[\w+\])+(\{\w+\})+|(\{\w+\}+)+)\s*\n*|((\\)(\w+))+\s*\n|(\%+\s*\n)'
    dict_for_repl = {'a.k.a.': 'aka', 'e.g.':'eg', 'resp.':'resp', 't.s.':'ts', 't.i.': 'ti'}
    cleaned = text.split('\n')
    cleaned = list(filter(lambda x: not re.fullmatch(reg, x), cleaned))
    return replace_all(' '.join(cleaned).lower(), dict_for_repl)
def start_of_text(raw_text, start_of_raw):
    """
    возвращает текст с начала содержания статьи
    """
    return raw_text[raw_text.find(start_of_raw):]
def add_clean(text):
    """
    дополнительная очистка и подготовка датасета
    """
    dict_for_repl = {'}_{':'_', '\\{':' ', '\\}':' ', '{':' ', '}':' ', '$': ' $ ', ' \\ ':' ', '\[':' \[ ', '\]':' \] '}
    return re.sub(' +', ' ', replace_all(text, dict_for_repl))
def hamming_distance(list1,list2):
    """
    Просмотр ошибок через расстояние Хэмминга
    """
    result =0
    for x,(i,j) in enumerate(zip(list(list1),list2)):
        if i!=j:
            print(f'char not math{i,j}in {x}')
            result+=1
    print(f"Расстояние Хэмминга = {result}")
def vectorize_sentence(sentence,model):
    """
    Модель для работы с векторными представляниями GloVe и Word2Vec
    """
    nlp = English()
    tokenizer = Tokenizer(nlp.vocab)
    a = []
    for i in tokenizer(sentence):
        try:
            a.append(model.get_vector(str(i)))
        except:
            pass
    a=np.array(a).mean(axis=0)
    a = np.zeros(300) if np.all(a!=a) else a
    return a

### Обработка статей


In [None]:
# вывод списка статей
print(os.listdir('./CO_fold'))

In [None]:
# считывание файлов датасета
text_file_list, annotation_file_list = return_all_files_texts()

In [None]:
# символы начала статей
start_of_raw_list = ["{__startdesc__}Given a positive integer $n \in \mathbb{Z}_{+}$",
"The purpose of this paper is to give a class of reconstructible graphs.",
"{__startDesc__}Let $B$ be properly $n$- colored bipartite multigraph with $n+1$ edges of each color{__endDesc__}. ",
"Exercise VII.47 of [PS] (brought to my attention by Richard Stanley),",
"Let $R=",
"Let $G$ be a simple graph.{__startDesc__}The collection $D(G)=(G_v)_{v\in V(G)}$",
"{__startDesc__}A",
"{__startDesc__}Stern's diatomic sequence $a_1=1,  a_{2n}=a_n,  a_{2n+1}=a_n+a_{n+1}$",
"We prove a classification theorem for Hankel weighing matrices.",
"This billet should be regarded as a footnote to \cite{GL}",
"Since  very few papers  concern  maximal symplectic partial spreads in dimension $>4$",
"{__startDesc__}The Rogers-Ramanujan identities"]
start_of_raw_list = [i.lower() for i in start_of_raw_list]

In [None]:
# обработка предложений
for i in range(len(text_file_list)):
    raw = text_file_list[i]
    ann = parse_ann_file(annotation_file_list[i])
    raw = highlight_description(ann, raw)
    raw = soft_clean(raw)
    text_file_list[i] = start_of_text(raw, start_of_raw_list[i])
    text_file_list[i] = text_file_list[i].split('.')

In [None]:
# создание списков меток класса
all_sent = []
labels = []
for i in text_file_list:
    for j in i:
        if j.find('{__startdesc__}')!=-1  and j.find('{__enddesc__}')!=-1:
            all_sent.append(j.replace('{__startdesc__}', '').replace('{__enddesc__}', ''))
            labels.append(1)
        else:
            all_sent.append(j)
            labels.append(0)

In [None]:
# дополнительно очищаем датасет
all_sent = [add_clean(i) for i in all_sent]
# закрываем операторские скобки
for i in range(len(all_sent)):
    if all_sent[i].count('$')%2 == 1:
        all_sent[i] += '$'
        all_sent[i+1] = '$' + all_sent[i+1]
    

In [None]:
# вывод количества примеров, количество положительных меток, пример меток
print(labels.__len__())
print(sum(labels))
print(labels[:15])

In [None]:
del text_file_list, annotation_file_list
gc.collect()

In [None]:
#загрузка дополнительных данных для обучения tf-idf и bow
add_list_for_stat, _ =   return_all_files_texts('../download/')

In [None]:
for i in range(add_list_for_stat.__len__()):
    raw = add_list_for_stat[i]
    raw = soft_clean(raw)
    add_list_for_stat[i] = raw.split('.')
extra_sentence = [add_clean(j) for i in add_list_for_stat for j in i]

In [None]:
full_sent = extra_sentence + all_sent

# Подборка векторного представления

### K-Fold with optimizing


In [None]:
def objective(trial: optuna.Trial) -> float:
    global X, y, samples, sample_labels
    param = {
        "verbosity": 0,
        "objective": trial.suggest_categorical("objective",["binary:logistic","binary:logitraw","binary:hinge"]),
        # use exact for small dataset.
        "tree_method": "exact",
        # defines booster, gblinear for linear functions.
        "booster": trial.suggest_categorical("booster", ["gbtree", "gblinear", "dart"]),
        # L2 regularization weight.
        "lambda": trial.suggest_float("lambda", 1e-8, 1.0, log=True),
        # L1 regularization weight.
        "alpha": trial.suggest_float("alpha", 1e-8, 1.0, log=True),
        # sampling ratio for training data.
        "subsample": trial.suggest_float("subsample", 0.2, 1.0),
        # sampling according to each tree.
        "colsample_bytree": trial.suggest_float("colsample_bytree", 0.2, 1.0),
        "scale_pos_weight": 50.8311688311688
    }

    if param["booster"] in ["gbtree", "dart"]:
        # maximum depth of the tree, signifies complexity of the tree.
        param["max_depth"] = trial.suggest_int("max_depth", 3, 30, step=2)
        # minimum child weight, larger the term more conservative the tree.
        param["min_child_weight"] = trial.suggest_int("min_child_weight", 2, 10)
        param["eta"] = trial.suggest_float("eta", 1e-8, 1.0, log=True)
        # defines how selective algorithm is.
        param["gamma"] = trial.suggest_float("gamma", 1e-8, 1.0, log=True)
        param["grow_policy"] = trial.suggest_categorical("grow_policy", ["depthwise", "lossguide"])

    if param["booster"] == "dart":
        param["sample_type"] = trial.suggest_categorical("sample_type", ["uniform", "weighted"])
        param["normalize_type"] = trial.suggest_categorical("normalize_type", ["tree", "forest"])
        param["rate_drop"] = trial.suggest_float("rate_drop", 1e-8, 1.0, log=True)
        param["skip_drop"] = trial.suggest_float("skip_drop", 1e-8, 1.0, log=True)

    skf = StratifiedKFold(5, shuffle=True, random_state=33)
    f1_list = []
    acc_list = []
    for train_index, test_index in skf.split(X,y):
        gbm = XGBClassifier(**param)
        gbm.fit(
            X[train_index],
            y[train_index],
            verbose=0,
        )
        preds = gbm.predict(X[test_index])
        pred_labels = np.rint(preds)
        f1_list.append(f1_score(y[test_index], pred_labels))
        accuracy_score
        acc_list.append(accuracy_score(y[test_index], pred_labels))
    return mean(f1_list), mean(acc_list)
def train_optuna_func():
    sampler = TPESampler(seed=10)  # Make the sampler behave in a deterministic way.
    study = optuna.create_study( directions=["maximize","maximize"],sampler=sampler)
    study.optimize(objective, n_trials=120)
    trial =  max(study.best_trials, key=lambda i: i.values[0])
    print(best)
    print("  Value: {}".format(trial.values))

    print("  Params: ")
    for key, value in trial.params.items():
        print("    {}: {}".format(key, value))

def train_optuna_func_4search():
    sampler = TPESampler(seed=10)  # Make the sampler behave in a deterministic way.
    study = optuna.create_study( directions=["maximize","maximize"],sampler=sampler)
    study.optimize(objective, n_trials=50)
    return study

In [None]:
y = np.array(deepcopy(labels))

### BOW

In [None]:
study_list = []
l = 5
for i in range(3):
    for j in range(2):
        for k in [.8, .9, .99]:
            print("ngram_range = (", 1 + j,";", 3 + i,") max_df = ", k," min_df", l)
            bow = CountVectorizer(min_df=l,max_df=k, ngram_range=(1 + j, 3 + i)) #remove rare and common words with df parameter
            bow.fit(full_sent)
            X = deepcopy(bow.transform(all_sent))
            del bow
            gc.collect()
            study_list.append({"ngram_range":(1 + j, 3 + i),"max_df": k,"min_df": l, "score": train_optuna_func_4search()})
best = max(study_list, key=lambda x: max(x['score'].best_trials, key=lambda i: i.values[0]))
trial =  max(best['score'].best_trials, key=lambda i: i.values[0])
print(best)
# for i in best["score"].best_trials:
print("  Value: {}".format(trial.values))

print("  Params: ")
for key, value in trial.params.items():
    print("    {}: {}".format(key, value))

### TF-IDF

In [None]:
print("TF-IDF")
study_list = []
for i in range(3):
    for j in range(2):
        for k in [.8, .9, .99]:
                #             for l in [5, 10, 25, 50, 100, 150]:
                l = 5
                print("ngram_range = (", 1 + j,";", 3 + i,") max_df = ", k," min_df", l)
                tfidf = TfidfVectorizer(min_df=l,max_df=k, ngram_range=(1 + j, 3 + i)) #remove rare and common words with df parameter
                tfidf.fit(full_sent)
                X = deepcopy(tfidf.transform(all_sent))
                del tfidf
                gc.collect()
                study_list.append({"ngram_range":(1 + j, 3 + i),"max_df": k,"min_df": l, "score": train_optuna_func_4search()})
best = max(study_list, key=lambda x: max(x['score'].best_trials, key=lambda i: i.values[0]))
trial =  max(best['score'].best_trials, key=lambda i: i.values[0])
print(best)
print("  Value: {}".format(trial.values))

print("  Params: ")
for key, value in trial.params.items():
    print("    {}: {}".format(key, value))

### Word2Vec

In [None]:
word2vec = gensim.downloader.load('word2vec-google-news-300') #1.66 gb
X = np.array([vectorize_sentence(i, model=word2vec) for i in all_sent])
del word2vec
gc.collect()
a = train_optuna_func()

### GloVe

In [None]:
gv = gensim.downloader.load('glove-wiki-gigaword-300') #376mb
X = np.array([vectorize_sentence(i, model=gv) for i in all_sent])
del gv
gc.collect()
train_optuna_func()

### USE

In [None]:
universal = hub.load("./transformer_model")
X = np.vstack(np.array([universal([i]) for i in all_sent]))
del universal
gc.collect()
train_optuna_func()

### BERT

In [None]:
bert = SentenceTransformer('stsb-roberta-large') #1.3 gb
X = np.array([bert.encode(i) for i in all_sent])
del bert
gc.collect()
train_optuna_func()

### MathBERT

In [None]:
bert = SentenceTransformer("tbs17/MathBERT")
X = np.array([bert.encode(i) for i in all_sent])
del bert
gc.collect()
train_optuna_func()