In [None]:
def main():

  # Выбор носителя
  device = 'cuda'

  # Установка библиотек
  ! pip install pytelegrambotapi
  ! pip install -U transformers
  ! pip install -U accelerate
  ! pip install sentence_transformers
  ! pip install python-dotenv

  # Загрузка бибилиотек
  import telebot
  from telebot import types
  from sentence_transformers.util import semantic_search
  import torch
  import pandas as pd
  import numpy as np
  from transformers import AutoTokenizer, AutoModel, AutoModelForQuestionAnswering
  import re
  import random

  # Подключение к Google Drive (для работы в Google Colab)
  from google.colab import drive
  drive.mount('/content/drive', force_remount=True)

  # Загрузка токена доступа к телеграм-боту
  from dotenv import load_dotenv
  import os
  load_dotenv('drive/MyDrive/Colab Notebooks/semantic_search_QA_telebot/.env')
  TOKEN = os.getenv("TOKEN")


  # Функции для работы с семантической моделью:
  def get_semantic_model(device):
    """
    Функция загружает модель для семантического поиска
    """
    sem_model = AutoModel.from_pretrained(
      'sentence-transformers/LaBSE').to(device)
    return sem_model


  def get_semantic_tokenizer():
    """
    Функция загружает токенизатор модели для семантического поиска
    """
    sem_tokenizer = AutoTokenizer.from_pretrained(
      'sentence-transformers/LaBSE')
    return sem_tokenizer


  def get_qa_model(device):
    """
    Функция загружает модель для поиска ответа на вопрос
    """
    qa_model = AutoModelForQuestionAnswering.from_pretrained("timpal0l/mdeberta-v3-base-squad2").to(device)
    return qa_model


  def get_qa_tokenizer():
    """
    Функция загружает токенизатор модели для поиска ответа на вопрос
    """
    qa_tokenizer = AutoTokenizer.from_pretrained("timpal0l/mdeberta-v3-base-squad2")
    return qa_tokenizer


  def mean_pooling(model_output, attention_mask):
    """
    Функция выполняет объединение средних значений (mean pooling) - выбирает среднее значение среди эмбеддингов
    """
    token_embeddings = model_output[0] #First element of model_output contains all token embeddings
    input_mask_expanded = attention_mask.unsqueeze(-1).expand(token_embeddings.size()).float()
    sum_embeddings = torch.sum(token_embeddings * input_mask_expanded, 1)
    sum_mask = torch.clamp(input_mask_expanded.sum(1), min=1e-9)
    return sum_embeddings / sum_mask


  def get_text_embeddings():
    """
    Функция загружает датасет с эмбеддингами всех текстов базы знаний
    """
    sentence_embeddings_df = pd.read_csv("drive/MyDrive/Colab Notebooks/semantic_search_QA_telebot/data/knowledge_db_128.csv", low_memory=False, encoding = "UTF-8", sep = ",")
    sentence_embeddings_df.drop("Unnamed: 0", axis = 1, inplace = True)
    sentence_embeddings = torch.cuda.FloatTensor(sentence_embeddings_df.values)
    return sentence_embeddings


  def get_texts():
    """
    Функция загружает датасет всех текстов базы знаний
    """
    sentence_df = pd.read_csv("drive/MyDrive/Colab Notebooks/semantic_search_QA_telebot/data/Knowledge_base_df.csv", low_memory=False, encoding = "UTF-8", sep = ",")
    sentences = [i for i in sentence_df["text"]]
    return sentences


  def encode_question(tokenizer, question, model, device):
    """
    Функция получает эмбеддинги вопросов пользователя к базе данных
    """
    encoded_input = tokenizer(
      question,
      padding=True,
      truncation=True,
      max_length=128,
      return_tensors='pt')

    with torch.no_grad():
      model_output = model(**encoded_input.to(device))

    question_embeddings = mean_pooling(
      model_output,
      encoded_input['attention_mask']
    )
    return question_embeddings


  def search(question_embeddings, sentence_embeddings):
    """
    Функция выполняет семантический поиск по базе знаний
    Выбираются k текстов из базы, наиболее близких по косинусному сходству эмбеддингов к вопросу пользователя
    """
    hits = semantic_search(question_embeddings, sentence_embeddings, top_k=1)
    return sentences[hits[0][0]["corpus_id"]]


  def get_answer(qa_model, qa_tokenizer, question, text, device):
    """
    Функция выполняет поиск ответа на вопрос пользователя в наиболее подходящем тексте из базы знаний
    """
    # Токенизация вопроса
    tokenized = qa_tokenizer.encode_plus(
      question, text,
      add_special_tokens=False
    )
    tokens = qa_tokenizer.convert_ids_to_tokens(tokenized['input_ids'])

    # Общая длина каждого блока
    max_chunk_length = 512
    # Длина наложения
    overlapped_length = 30
    # Длина вопроса в токенах
    answer_tokens_length = tokenized.token_type_ids.count(0)
    # Токены вопроса, закодированные числами
    answer_input_ids = tokenized.input_ids[:answer_tokens_length]
    # Длина основного текста первого блока без наложения
    first_context_chunk_length = max_chunk_length - answer_tokens_length
    # Длина основного текста остальных блоков с наложением
    context_chunk_length = max_chunk_length - answer_tokens_length - overlapped_length

    # Токены основного текста
    context_input_ids = tokenized.input_ids[answer_tokens_length:]
    # Основной текст первого блока
    first = context_input_ids[:first_context_chunk_length]
    # Основной текст остальных блоков
    others = context_input_ids[first_context_chunk_length:]

    # Если есть блоки кроме первого
    # тогда обрабатываются все блоки
    if len(others) > 0:
      # Кол-во нулевых токенов, для выравнивания последнего блока по длине
      padding_length = context_chunk_length - (len(others) % context_chunk_length)
      others += [0] * padding_length

      # Кол-во блоков и их длина без добавления наложения
      new_size = (
          len(others) // context_chunk_length,
          context_chunk_length
      )

      # Упаковка блоков
      new_context_input_ids = np.reshape(others, new_size)

      # Вычисление наложения
      overlappeds = new_context_input_ids[:, -overlapped_length:]
      # Добавление в наложения частей из первого блока
      overlappeds = np.insert(overlappeds, 0, first[-overlapped_length:], axis=0)
      # Удаление наложение из последнего блока, так как оно не нужно
      overlappeds = overlappeds[:-1]

      # Добавление наложения
      new_context_input_ids = np.c_[overlappeds, new_context_input_ids]
      # Добавление первого блока
      new_context_input_ids = np.insert(new_context_input_ids, 0, first, axis=0)

      # Добавление вопроса в каждый блок
      new_input_ids = np.c_[
        [answer_input_ids] * new_context_input_ids.shape[0],
        new_context_input_ids
      ]
    # иначе обрабатывается только первый
    else:
      # Кол-во нулевых токенов, для выравнивания блока по длине
      padding_length = first_context_chunk_length - (len(first) % first_context_chunk_length)
      # Добавление нулевых токенов
      new_input_ids = np.array(
        [answer_input_ids + first + [0] * padding_length]
      )

    # Кол-во блоков
    count_chunks = new_input_ids.shape[0]

    # Маска, разделяющая вопрос и текст
    new_token_type_ids = [
      # вопрос блока
      [0] * answer_tokens_length
      # текст блока
      + [1] * (max_chunk_length - answer_tokens_length)
    ] * count_chunks

    # Маска "внимания" модели на все токены, кроме нулевых в последнем блоке
    new_attention_mask = (
      # во всех блоках, кроме последнего, "внимание" на все слова
      [[1] * max_chunk_length] * (count_chunks - 1)
      # в последнем блоке "внимание" только на ненулевые токены
      + [([1] * (max_chunk_length - padding_length)) + ([0] * padding_length)]
    )

    # Токенизированный текст в виде блоков, упакованный в torch
    new_tokenized = {
    'input_ids': torch.tensor(new_input_ids).to(device),
    'token_type_ids': torch.tensor(new_token_type_ids).to(device),
    'attention_mask': torch.tensor(new_attention_mask).to(device)
    }
    device = "cuda"
    outputs = qa_model(**new_tokenized)

    # Позиции в 2D списке токенов начала и конца наиболее вероятного ответа
    # позиции одним числом
    start_index = torch.argmax(outputs.start_logits)
    end_index = torch.argmax(outputs.end_logits)

    # Пересчёт позиций начала и конца ответа для 1D списка токенов
    # = длина первого блока + (
    #   позиция - длина первого блока
    #   - длина ответов и отступов во всех блоках, кроме первого
    # )
    start_index = max_chunk_length + (
      start_index - max_chunk_length
      - (answer_tokens_length + overlapped_length)
      * (start_index // max_chunk_length)
    )
    end_index = max_chunk_length + (
      end_index - max_chunk_length
      - (answer_tokens_length + overlapped_length)
      * (end_index // max_chunk_length)
    )

    # Составление ответа
    # если есть символ начала слова '▁', то он заменяется на пробел
    answer_raw = ''.join(
      [t.replace('▁', ' ') for t in tokens[start_index:end_index+1]]
    )
    # Если ответ содержит скобки, скобки удаляются
    answer = re.sub(r"\(+|\)+", "", answer_raw)
    # Возвращается словарь, содержащий ответ на вопрос (answer) и наиболее подходящий текст, откуда взят ответ (text)
    return {"answer" : answer.strip(), "text" : text}


  def process_question(question, sem_tokenizer, sem_model, device):
    """
    Функция получает вопрос пользователя к базе данных,
    вызывает функции:
      получения эмбеддингов вопроса (encode_question),
      семантического поиска подходящего текста(search),
      поиска ответа на вопрос в тексте(get_answer)
    возвращает словарь, содержащий ответ на вопрос (answer) и наиболее подходящий текст, откуда взят ответ (text)
    """
    question_embeddings = encode_question(sem_tokenizer, question, sem_model, device)
    text = search(question_embeddings, sentence_embeddings)
    answer_dict = get_answer(qa_model, qa_tokenizer, question, text, device)
    torch.cuda.empty_cache()
    return answer_dict


  # Определение параметров телеграм-бота (telebot)
  your_bot = TOKEN
  bot = telebot.TeleBot(your_bot)

  # Готовые вопросы для телеграм-бота
  questions = [
    'Можешь рассказать о C++ (си-плюс-плюс)?',
    'Что такое Dark Web (дарквеб, тёмная паутина)?',
    'Что такое Midjourney?',
    'Что такое Stable Diffusion?',
    'Что такое Netfilter?',
    'Что такое LLaMA?',
    'Что такое уязвимость нулевого дня (также известная как 0-day)?',
    'Хочу узнать про 5G',
    'Что такое Битрикс24?',
    'Что такое SOCKS?',
    'Что такое DALL-E?',
    'Что значит "эмулятор"?',
    'Что такое троян?',
    'Что такое интернет вещей?',
    'Что такое Pretty Good Privacy?',
    'Что такое макровирус?',
    'Что такое лутбокс?',
    'Какой хакер считается этичным?',
    'Что такое honeypot?',
    'Расскажи про первый компьютерный червь',
    'Расскажи про UNIX',
    'Что такое GitHub?',
    'Что такое Linux?',
    'Что такое ChatGPT?',
    'Расскажи про проект Apache',
    'Что такое резервная копия?',
    'Что такое Clang?',
    'Что такое биткойн?',
    'Что такое Internet Standard?',
    'Что значит Sensitive Information?'
  ]

  # Путь начала работы телеграм-бота
  @bot.message_handler(commands=["start"])
  def start(message, res=False):
    markup=types.ReplyKeyboardMarkup(resize_keyboard=True)
    item1=types.KeyboardButton("Случайный вопрос")
    item2=types.KeyboardButton("Помощь")
    markup.row(item1, item2)
    bot.send_message(message.chat.id, 'Здравствуйте! Я – Сиби (Cybersecurity Knowledge Bot). Я буду рад ответить на ваши вопросы по кибербезопасности. Пожалуйста, введите ваш вопрос:', reply_markup=markup)

  # Путь телеграм-бота для получения и обработки вопроса пользователя к базе знаний
  @bot.message_handler(content_types=["text"])
  def handle_text(message):

    if message.text.strip() == 'Помощь':
      answer = 'Введите интересующий вас вопрос по теме кибербезопасности или нажмите кнопку "Случайный вопрос". Я дам краткий ответ и подберу подходящую статью из базы знаний. Лучше всего я понимаю вопросы вида "Что такое ChatGPT?". Если ответ не подошел, попробуйте сформулировать вопрос по-другому.'
      bot.send_message(message.chat.id, answer)

    elif message.text.strip() == 'Случайный вопрос':
      question = questions[random.randint(0, len(questions) - 1)]
      answer_dict = process_question(question, sem_tokenizer, sem_model, device)
      bot.send_message(message.chat.id, question)
      bot.send_message(message.chat.id, "Ответ: " + answer_dict["answer"])
      bot.send_message(message.chat.id, "Подробнее: " + answer_dict["text"])

    else:
      question = message.text.strip()
      answer_dict = process_question(question, sem_tokenizer, sem_model, device)
      bot.send_message(message.chat.id, "Ответ: " + answer_dict["answer"])
      bot.send_message(message.chat.id, "Подробнее: " + answer_dict["text"])


  # получение семантической модели и токенизатора
  sem_model = get_semantic_model(device)
  sem_tokenizer = get_semantic_tokenizer()

  # получение qa-модели и токенизатора
  qa_model = get_qa_model(device)
  qa_tokenizer = get_qa_tokenizer()

  # получение эмбеддингов и текстов базы знаний
  sentence_embeddings = get_text_embeddings()
  sentences = get_texts()
  print("success")

  # активация телеграм-бота
  bot.polling(none_stop=True, interval=0)


if __name__ == "__main__":
    main()

Mounted at /content/drive
success
