# NLU Classifier

In [None]:
# Импортирование модулей сервиса
from ConversationManager import ConversationManager
from NerExtractor        import NerExtractor

import torch
from sentence_transformers import SentenceTransformer, util
from transformers import AutoTokenizer, AutoModel
import docx
import re
from langchain.vectorstores import Chroma
from langchain.docstore.document import Document


class NLU_Classifier:
    """
    Классификатор для обработки естественного языка (NLU), специализирующийся на поиске и классификации терминов и запросов.
    """
    def __init__(self, nlu_model="../../local_huggingface/sber_large_mt_nlu_ru", file_path):
        """
        Инициализирует NLU_Classifier с заданной моделью NLU.

        Параметры:
            nlu_model (str): Путь к предобученной модели NLU.
        """
        self.synonyms_dicts = [['ПАК ЗВП', 'PTAF', 'WAF', 'ПТАФ', 'ВАФ', 'программно-аппаратный комплекс защиты веб приложений', 'веб-файервол', 'web firewall'],
                                 ['Антивирус', 'KSC', 'Касперский', 'Kaspersky', 'антивирус касперского', 'антивирусная защита', 'АВЗ', 'комплексная система антивирусной защиты', 'КСАЗ'],
                                 ['NGate', 'шифрование по ГОСТ', 'ГОСТ шифрование', 'НГейт', 'TLS-ГОСТ'],
                                 ['Hashicorp Vault', 'Vault', 'Вольт', 'Хашикорп вольт', 'менеджер секретов', 'система хранения секретов', 'система управления секретами'],
                                 ['КриптоПро', 'CryptoPro', 'CSP', 'криптопровайдер'],
                                 ['СТП', 'ТП', 'техподдержка', 'техническая поддержка', 'служба технической поддержки'],
                                 ['Центр обеспечения безопасности', 'Центр обеспечения информационной безопасности', 'Security Operations Center', 'SOC', 'СОК'],
                                 ['NTP', 'сервис точного времени', 'сервис времени', 'Network Time Protocol'],
                                 ['СЗИ', 'система защиты информации'],
                                 ['СрЗИ', 'средство защиты информации'],
                                 ['СКЗИ', 'средство криптозащиты', 'средство криптографической защиты'],
                                 ['ЭП', 'электронная подпись']
                                 # ['ИС', 'информационная система', 'веб-приложение ИС', 'веб приложение ИС']
                                 ]

        self.aditional_data =  ['ПАК ЗВП это программно-аппаратный комплекс защиты веб приложений производства Positive Technologies. Подробнее про это можно прочитать тут: https://www.ptsecurity.com/ru-ru/products/af/']

        print('Loading retriver model...')
        # Загрузка токенизатора для предварительно обученной модели NLU
        self.retriver_tokenizer = AutoTokenizer.from_pretrained(nlu_model)

        # Загрузка предварительно обученной модели NLU с переносом на GPU для ускорения обработки
        self.retriver_model = AutoModel.from_pretrained(nlu_model).to(device='cuda')
        print('Retriver model loaded')

        # Хранится файл на базу знаний, из которого создаётся RAG
        self.file_path = file_path

        # Загрузка данных из файла
        data_list, sections = self.get_data(self.file_path)
        self.data_list = data_list + self.aditional_data
        self.sections = sections + self.aditional_data

        # Вычисление векторных представлений для списка данных
        self.sentence_embeddings = self.get_embenddings(self.sections, max_length=12)
        self.sentence_embeddings.to(device='cpu')

        # Инициализация NerExtractor
        self.extractor = NerExtractor()

    def get_data(file_path: str):
        """
        Получение данных из файла.

        Args:
        file_path (str): Путь к файлу, из которого необходимо извлечь данные.

        Returns:
        tuple: Возвращает кортеж из двух элементов - список всех данных и список секций.

        Описание:
        Функция считывает данные из указанного файла и организует их в удобный для дальнейшей
        обработки формат. Разделяет данные на отдельные секции для упрощения доступа.
        """
        # Загрузка документа
        doc = docx.Document(file_path)

        sections = []
        all_data = []
        data_str = ' '
        block_name = None
        old_sec = None
        link = None

        # Регулярное выражение для извлечения ссылки и названия секции
        link_pattern = re.compile(r'\((https?://[^\s]+)\)')
        section_pattern = re.compile(r'«(.*?)»')

        # Итерируем по параграфам документа
        for paragraph in doc.paragraphs:
            text = paragraph.text.strip()

            if 'Страница' in text:
                # Обработка названия секции
                sec_match = section_pattern.search(text)
                sec = sec_match.group(1) if sec_match else text

                # Обработка данных предыдущего блока
                if data_str.strip() and block_name:
                    if '\tПДн' in data_str and len(data_str) > 1000:
                        list_str = []
                    elif '\nРаздел в разработке' in data_str:
                        data_str = ' '
                        list_str = []
                    else:
                        list_str = [data_str[2:] + ' Подробнее про это можно прочитать тут: ' + link]

                    if list_str:
                        all_data += list_str
                        data_str = ' '
                        sections.append(old_sec)

                old_sec = sec
                block_name = text

                # Поиск ссылки в тексте
                link_match = link_pattern.search(text)
                link = link_match.group(1) if link_match else None
                continue

            data_str += '\n' + text

        # Добавление последнего блока данных
        if data_str.strip():
            list_str = [data_str + ' Подробнее про это можно прочитать тут: ' + link]
            all_data += list_str

        return all_data[:-1], sections

    def search_in_context(self, query, sentence_embeddings, model, tokenizer, data_list, treshold, top_k=5):
        """
        Поиск в контексте с использованием векторных представлений.

        Описание:
        Функция выполняет поиск по векторным представлениям, используя косинусное сходство,
        для определения наиболее релевантных предложений из списка данных.
        """
        from sentence_transformers import util

        # Получите векторные представления для запроса
        query_embedding = self.get_embenddings(query, max_length=12)

        # Используйте косинусное сходство для поиска наиболее похожих контекстов
        similarities = util.pytorch_cos_sim(query_embedding, sentence_embeddings)[0]

        # Получите индексы наиболее похожих контекстов
        top_k_indices = similarities.argsort(descending=True)[:top_k]

        # # Верните наиболее похожие контексты
        # results = [data_list[i] for i in top_k_indices]

        # Получаем оценки похожести
        similarity_scores = [similarities[i].item() for i in top_k_indices]

        # Получаем наиболее похожие контексты
        results = [data_list[i] for j, i in enumerate(top_k_indices) if similarity_scores[j] > treshold]

        return results

    def search_in_context_with_score(self, query, sentence_embeddings, model, tokenizer, data_list, treshold, top_k=5):
        """
        Поиск в контексте с оценками сходства.

        Описание:
        Функция выполняет поиск по векторным представлениям с возвращением оценок сходства,
        позволяя оценить степень релевантности каждого из результатов поиска.
        """
        from sentence_transformers import util

        # Получите векторные представления для запроса
        query_embedding = self.get_embenddings(query, max_length=12)

        # Используйте косинусное сходство для поиска наиболее похожих контекстов
        similarities = util.pytorch_cos_sim(query_embedding, sentence_embeddings)[0]

        # Получите индексы наиболее похожих контекстов
        top_k_indices = similarities.argsort(descending=True)[:top_k]

        # Вернет наиболее похожие контексты
        # results = [data_list[i] for i in top_k_indices]

        # Получаем оценки похожести
        similarity_scores = [similarities[i].item() for i in top_k_indices]

        # Получаем наиболее похожие контексты
        results = [(data_list[i],similarity_scores[j]) for j, i in enumerate(top_k_indices) if similarity_scores[j] > treshold]

        return results

    def get_context(self, question: str):
        """
        Получает контекст для заданного вопроса.

        Эта функция использует список синонимов для расширения поиска, пытаясь найти
        наиболее подходящий контекстный ответ на вопрос.

        Параметры:
            question (str): Вопрос для обработки.

        Возвращает:
            str: Контекстный ответ на вопрос.
        """

        # Приведение вопроса к нижнему регистру для унификации поиска
        question = question.lower()

        # Инициализация списка для проверки синонимов
        check_list = []

        # Перебор списка синонимов для расширения запроса
        for synonyms in self.synonyms_dicts:
            for synonym in synonyms:
                if synonym.lower() in question:
                    check_list += [question.replace(synonym.lower(), el) for el in synonyms]

        # Список для контекстных ответов
        context_responses = []

        # Ищем контекстные ответы для всех вариаций вопроса
        if check_list:
            for quest in check_list:
                context_responses += [el[0] for el in sorted(
                    self.search_in_context_with_score(quest, self.sentence_embeddings, self.retriver_model, self.retriver_tokenizer, self.data_list, 0.22, top_k=5),
                    reverse=True, key=lambda x: x[1])]
        else:
            context_responses = self.search_in_context(question, self.sentence_embeddings, self.retriver_model, self.retriver_tokenizer, self.data_list, 0.22, top_k=5)

        # Определяем, является ли запрос релевантным
        is_relevant = len(context_responses) > 0

        # Возвращаем кортеж из флага релевантности и ответов
        return is_relevant, '\n\n'.join(context_responses[:3]) if context_responses else ' '

    def extract_named_entities(question):
        """
        Извлекает именованные сущности из вопроса, классифицирует их, и возвращает соответствующий контент.

        Эта функция использует NER (Named Entity Recognition) для идентификации именованных сущностей в заданном вопросе.
        На основе этих сущностей функция ищет соответствующие классы и подклассы в данных, и возвращает связанный с ними контент.

        Возвращает:
            Строка с контентом, соответствующим именованным сущностям вопроса, и ссылкой для дополнительной информации.

        Исключения:
            Возвращает пустую строку, если не найдены соответствующие именованные сущности или классы.
        """
        # NER
        # Получение именнованых сущностей
        entries = self.extractor.get_entities(question)
        ent = [question[el[-2]:el[-1]] for el in entries]

        if ent != []:
            # Получение классов с именоваными сущностями
            cl_s = {el[1]:el[0].page_content  for el in cl_s if ent[0] in el[0].page_content}
        elif cl_s == []:
            return ' '

        else:
            tp_class = sorted(cl_s, key=lambda x: x[1])[0][0].page_content
            test = tp_class

        if isinstance(cl_s, dict):
            if list(cl_s.keys()) != []:
                # print(clas.keys())
                tp_class = cl_s[min(cl_s.keys())]

                # Получение подклассов
                subclasses = Chroma.from_documents([Document(page_content=el) for el in data[tp_class].keys()], embeddings, collection_name=''.join(random.choice(characters) for _ in range(10)))

                # Получение ближайших под_классов
                tp_subclass = subclasses.similarity_search(question, k = 3)
                return '\n\n'.join([data[tp_class][el.page_content] for el in tp_subclass])  + ' Подробнее прочитать об этом вы можете тут: ' + tp_class
        else:
            # Получение подклассов
            subclasses = Chroma.from_documents([Document(page_content=el) for el in data[tp_class].keys()], embeddings, collection_name=''.join(random.choice(characters) for _ in range(10)))

            # Получение ближайших под_классов
            tp_subclass = subclasses.similarity_search(question, k = 3)
            test = tp_subclass
            return '\n\n'.join([data[tp_class][el.page_content] for el in tp_subclass]) + ' Подробнее прочитать об этом вы можете тут: ' + tp_class


    def get_entities(self, text: str):
        """
        Извлекает сущности из предоставленного текста.

        Параметры:
            text (str): Текст для извлечения сущностей.

        Возвращает:
            list: Список извлеченных сущностей.
        """
        return self.extractor.get_entities(text)

    def get_embenddings(self, data_list, max_length=12):
        """
        Вычисляет векторные представления для списка данных.

        Параметры:
            data_list (list): Список данных для обработки.
            max_length (int, optional): Максимальная длина вектора. По умолчанию 12.

        Возвращает:
            torch.Tensor: Тензор векторных представлений.
        """
        # Токенизация входного списка данных с заданными параметрами.
        # padding=True обеспечивает одинаковую длину всех последовательностей.
        # truncation=True усекает данные, превышающие max_length.
        # return_tensors='pt' возвращает тензоры PyTorch.
        try:
            encoded_input = self.retriver_tokenizer(data_list, padding=True, truncation=True, max_length=64, return_tensors='pt')

            # Перемещение данных на GPU для ускорения вычислений (если доступно).
            encoded_input = {key: value.to('cuda') for key, value in encoded_input.items()}

            # Вычисление векторных представлений без обучения модели (no_grad).
            with torch.no_grad():
                model_output = self.retriver_model(**encoded_input)

            # Вызов функции mean_pooling для получения усредненных векторных представлений.
            return self.mean_pooling(model_output, encoded_input['attention_mask'])
        except Exception as e:
            print(f"Ошибка при получении векторных представлений: {e}")

    def mean_pooling(self, model_output, attention_mask):
        """
        Выполняет усреднение пулинга для токенов.

        Эта функция используется для агрегирования выходных данных модели (представлений токенов)
        в одно усредненное векторное представление для каждого входного примера.

        Параметры:
            model_output: Выходные данные модели.
            attention_mask: Маска внимания.

        Возвращает:
            torch.Tensor: Усредненное векторное представление.
        """
        # Получение векторных представлений токенов из последнего скрытого состояния модели
        token_embeddings = model_output.last_hidden_state

        # Расширение маски внимания для соответствия размерам токенных векторов
        input_mask_expanded = attention_mask.unsqueeze(-1).expand(token_embeddings.shape)

        # Умножение каждого токенного вектора на его маску внимания и суммирование
        # для получения общего векторного представления для каждого примера
        sum_embeddings = torch.sum(token_embeddings * input_mask_expanded, 1)

        # Подсчет количества токенов для каждого примера (с учетом маски внимания)
        sum_mask = torch.clamp(input_mask_expanded.sum(1), min=1e-9)

        # Деление суммы векторов на количество токенов для получения усредненного представления
        return sum_embeddings / sum_mask