# Решение задачи

Скачаем необходимые библиотеки:

In [None]:
!pip install -q aiogram
!pip install -q openai wikipedia tiktoken mwclient mwparserfromhell
!pip install -q nest_asyncio

Импортируем необходимые библиотеки:

In [None]:
import os
import asyncio
import re
import warnings
import requests
import numpy as np
import openai
from openai import OpenAI
import tiktoken
import wikipedia
import pandas as pd
import ast
from scipy import spatial
import mwclient
import mwparserfromhell
from aiogram import Bot, Dispatcher, types
from aiogram.filters import Command
from google.colab import userdata

import nest_asyncio
nest_asyncio.apply() #позволяет использовать asyncio.run() в средах с вложенным циклом событий

Отключение предупреждений:

In [None]:
warnings.filterwarnings('ignore')

Получаем ключ от **VSEGPT** и токен для Telegram-бота. Предварительно добавили их в секреты **Google Colab**. Также выполним настройку клиента **OpenAI**, заточенную под сервис, на котором приобретен ключ:

In [None]:
try:
    openai_key = userdata.get('VSEGPT_API_K')
    TELEGRAM_TOKEN = userdata.get('BOT_API')
except userdata.SecretNotFoundError as e:
    raise Exception(f"Секрет не найден: {e}")

os.environ["OPENAI_API_KEY"] = openai_key
client = OpenAI(
    api_key=openai_key,
    base_url="https://api.vsegpt.ru/v1",
)

`GPT_MODEL` = 'gpt-3.5-turbo' — используемая модель GPT для генерации ответов на вопросы;

`CATEGORY_TITLE` = "Category:Academy Awards" — категория Википедии, из которой собираются данные (все статьи, связанные с Оскарами);

`WIKI_SITE` = "en.wikipedia.org" — сайт Википедии (английская версия), откуда извлекается информация;

`MAX_TOKENS` = 1600 — максимальное количество токенов в одном куске текста (для обработки длинных статей);

`EMBEDDING_MODEL` = "text-embedding-ada-002" — модель для создания векторных представлений текста (для поиска релевантных ответов);

`SECTIONS_TO_IGNORE` — список разделов статей Википедии, которые игнорируются, чтобы оставить только полезный контент.

In [None]:
GPT_MODEL = 'gpt-3.5-turbo'
CATEGORY_TITLE = "Category:Academy Awards"
WIKI_SITE = "en.wikipedia.org"
MAX_TOKENS = 1600
EMBEDDING_MODEL = "text-embedding-ada-002"
SECTIONS_TO_IGNORE = [
    "See also", "References", "External links", "Further reading",
    "Footnotes", "Bibliography", "Sources", "Citations",
    "Literature", "Footnotes", "Notes and references", "Photo gallery",
    "Works cited", "Photos", "Gallery", "Notes",
    "References and sources", "References and notes",
]

Эта функция рекурсивно собирает заголовки статей из категории Википедии и её подкатегорий:

In [None]:
def titles_from_category(
    category: mwclient.listing.Category, #задаем типизированный параметр категории статей
    max_depth: int #определяем глубину вложения статей
) -> set[str]:
    titles = set() #используем множество для хранения заголовков статей
    for cm in category.members(): #перебираем вложенные объекты категории
        if type(cm) == mwclient.page.Page: #если объект является страницей
            titles.add(cm.name) #в хранилище заголовков добавляем имя страницы
        elif isinstance(cm, mwclient.listing.Category) and max_depth > 0: #если объект является категорией и глубина вложения не достигла максимальной
            deeper_titles = titles_from_category(cm, max_depth=max_depth - 1) #вызываем рекурсивно функцию для подкатегории
            titles.update(deeper_titles) #добавление в множество элементов из другого множества
    return titles

Следующая функция возвращает список всех вложенных секций для заданной секции страницы Википедии:

In [None]:
def all_subsections_from_section(
    section: mwparserfromhell.wikicode.Wikicode, #текущая секция
    parent_titles: list[str], #заголовки родителя
    sections_to_ignore: set[str], #секции, которые необходимо проигнорировать
) -> list[tuple[list[str], str]]:

    #извлекаем заголовки текущей секции
    headings = [str(h) for h in section.filter_headings()]
    title = headings[0]
    #заголовки Википедии имеют вид: "== Heading =="
    if title.strip("=" + " ") in sections_to_ignore:
        #если заголовок секции в списке для игнора, то пропускаем его
        return []

    titles = parent_titles + [title] #объединим заголовки и подзаголовки, чтобы сохранить контекст для chatGPT
    full_text = str(section) #преобразуем wikicode секции в строку
    section_text = full_text.split(title)[1] #выделяем текст секции без заголовка
    if len(headings) == 1:
        #если один заголовок, то формируем результирующий список
        return [(titles, section_text)]
    else:
        first_subtitle = headings[1]
        section_text = section_text.split(first_subtitle)[0]
        results = [(titles, section_text)] #формируем результирующий список из текста до первого подзаголовка
        for subsection in section.get_sections(levels=[len(titles) + 1]):
            results.extend(
                #вызываем функцию получения вложенных секций для заданной секции
                all_subsections_from_section(subsection, titles, sections_to_ignore)
                )  #объединяем результирующие списки данной функции и вызываемой
        return results

Эта функция возвращает список всех секций страницы, за исключением тех, которые отбрасываем:

In [None]:
def all_subsections_from_title(
    title: str, #заголовок статьи Википедии, которую парсим
    sections_to_ignore: set[str] = SECTIONS_TO_IGNORE, #секции, которые игнорируем
    site_name: str = WIKI_SITE, #ссылка на сайт википедии
) -> list[tuple[list[str], str]]:

    #инициализация объекта MediaWiki
    #WIKI_SITE ссылается на англоязычную часть Википедии
    site = mwclient.Site(site_name)
    page = site.pages[title] #запрашиваем страницу по заголовку
    text = page.text() #получаем текстовое представление страницы
    parsed_text = mwparserfromhell.parse(text) #удобный парсер для MediaWiki
    headings = [str(h) for h in parsed_text.filter_headings()] #извлекаем заголовки
    if headings: #если заголовки найдены
        #в качестве резюме берем текст до первого заголовка
        summary_text = str(parsed_text).split(headings[0])[0]
    else:
        #если нет заголовков, то весь текст считаем резюме
        summary_text = str(parsed_text)
    results = [([title], summary_text)] #добавляем резюме в результирующий список
    for subsection in parsed_text.get_sections(levels=[2]): #извлекаем секции 2-го уровня
        results.extend(
            #вызываем функцию получения вложенных секций для заданной секции
            all_subsections_from_section(subsection, [title], sections_to_ignore)
        ) #объединяем результирующие списки данной функции и вызываемой
    return results

Далее напишем функцию для очистки текста секции от ссылок <ref>xyz</ref>, начальных и конечных пробелов:

In [None]:
def clean_section(section: tuple[list[str], str]) -> tuple[list[str], str]:
    titles, text = section
    text = re.sub(r"<ref.*?</ref>", "", text) #удаляем ссылки
    text = text.strip() #удаляем пробелы вначале и конце
    return (titles, text)

Отфильтруем короткие и пустые секции:

In [None]:
def keep_section(section: tuple[list[str], str]) -> bool:
    _, text = section
    return len(text) >= 16

Функция подсчета токенов:

In [None]:
def num_tokens(text: str, model: str = GPT_MODEL) -> int:
    encoding = tiktoken.encoding_for_model(model)
    return len(encoding.encode(text))

Функция разделения строк:

In [None]:
def halved_by_delimiter(string: str, delimiter: str = "\n") -> list[str, str]:
    chunks = string.split(delimiter) #делим строку на части по разделителю, по умолчанию \n - перенос строки
    if len(chunks) == 1:
        return [string, ""]  #разделитель не найден
    elif len(chunks) == 2:
        return chunks  #нет необходимости искать промежуточную точку
    else:
        #считаем токены
        total_tokens = num_tokens(string)
        halfway = total_tokens // 2
        best_diff = halfway #предварительное разделение по середине числа токенов
        #в цикле ищем какой из разделителей, будет ближе всего к best_diff
        for i, chunk in enumerate(chunks):
            left = delimiter.join(chunks[: i + 1])
            left_tokens = num_tokens(left)
            diff = abs(halfway - left_tokens)
            if diff >= best_diff:
                break
            else:
                best_diff = diff
        left = delimiter.join(chunks[:i])
        right = delimiter.join(chunks[i:])
        #возвращаем левую и правую часть оптимально разделенной строки
        return [left, right]

Затем создадим функцию, которая обрезает строку до максимально разрешенного числа токенов:

In [None]:
def truncated_string(
    string: str, #строка
    model: str, #модель
    max_tokens: int, #максимальное число разрешенных токенов
    print_warning: bool = True, #флаг вывода предупреждения
) -> str:
    encoding = tiktoken.encoding_for_model(model)
    encoded_string = encoding.encode(string)
    truncated_string = encoding.decode(encoded_string[:max_tokens]) #обрезаем строку и декодируем обратно
    if print_warning and len(encoded_string) > max_tokens:
        print(f"Предупреждение: Строка обрезана с {len(encoded_string)} токенов до {max_tokens} токенов.")
    #усеченная строка
    return truncated_string

Функция делит секции статьи на части по максимальному числу токенов:

In [None]:
def split_strings_from_subsection(
    subsection: tuple[list[str], str], #секции
    max_tokens: int = 1000, #максимальное число токенов
    model: str = GPT_MODEL, #модель
    max_recursion: int = 5, #максимальное число рекурсий
) -> list[str]:
    titles, text = subsection
    string = "\n\n".join(titles + [text])
    num_tokens_in_string = num_tokens(string)
    #если длина соответствует допустимой, то вернет строку
    if num_tokens_in_string <= max_tokens:
        return [string]
    #если в результате рекурсия не удалось разделить строку, то просто усечем ее по числу токенов
    elif max_recursion == 0:
        return [truncated_string(string, model=model, max_tokens=max_tokens)]
    #иначе разделим пополам и выполним рекурсию
    else:
        titles, text = subsection
        for delimiter in ["\n\n", "\n", ". "]: #пробуем использовать разделители от большего к меньшему (разрыв, абзац, точка)
            left, right = halved_by_delimiter(text, delimiter=delimiter)
            if left == "" or right == "":
                #если какая-либо половина пуста, повторяем попытку с более простым разделителем
                continue
            else:
                #применим рекурсию на каждой половине
                results = []
                for half in [left, right]:
                    half_subsection = (titles, half)
                    half_strings = split_strings_from_subsection(
                        half_subsection,
                        max_tokens=max_tokens,
                        model=model,
                        max_recursion=max_recursion - 1, #уменьшаем максимальное число рекурсий
                    )
                    results.extend(half_strings)
                return results
    #иначе никакого разделения найдено не было, поэтому просто обрезаем строку (должно быть очень редко)
    return [truncated_string(string, model=model, max_tokens=max_tokens)]

Функция отправки **chatGPT** строки для ее токенизации (вычисления эмбедингов):

In [None]:
def get_embedding(text, model=EMBEDDING_MODEL):
    return client.embeddings.create(input=[text], model=model).data[0].embedding

Эта функция находит самые релевантные тексты из базы знаний для заданного запроса:

In [None]:
def strings_ranked_by_relatedness(
    query: str, #пользовательский запрос
    df: pd.DataFrame, #DataFrame со столбцами text и embedding (база знаний)
    relatedness_fn=lambda x, y: 1 - spatial.distance.cosine(x, y), # функция схожести, косинусное расстояние
    top_n: int = 100 #выбор лучших n-результатов
) -> tuple[list[str], list[float]]: #функция возвращает кортеж двух списков, первый содержит строки, второй - числа с плавающей запятой

    #отправляем в OpenAI API пользовательский запрос для токенизации
    query_embedding_response = openai.embeddings.create(
        model=EMBEDDING_MODEL,
        input=query,
    )
    query_embedding = query_embedding_response.data[0].embedding #получен токенизированный пользовательский запрос
    #сравниваем пользовательский запрос с каждой токенизированной строкой DataFrame
    strings_and_relatednesses = [
        (row["text"], relatedness_fn(query_embedding, row["embedding"]))
        for i, row in df.iterrows()
    ]
    strings_and_relatednesses.sort(key=lambda x: x[1], reverse=True) #сортируем по убыванию схожести полученный список
    strings, relatednesses = zip(*strings_and_relatednesses) #преобразовываем наш список в кортеж из списков
    return strings[:top_n], relatednesses[:top_n] #возвращаем n лучших результатов

Функция формирования запроса к **chatGPT** по пользовательскому вопросу и базе знаний:

In [None]:
def query_message(
    query: str, #пользовательский запрос
    df: pd.DataFrame, #DataFrame со столбцами text и embedding (база знаний)
    model: str, #модель
    token_budget: int #ограничение на число отсылаемых токенов в модель
) -> str:
    strings, relatednesses = strings_ranked_by_relatedness(query, df) #функция ранжирования базы знаний по пользовательскому запросу
    #шаблон инструкции для chatGPT
    message = 'Use the below articles on the 2022 Winter Olympics to answer the subsequent question. If the answer cannot be found in the articles, write "I could not find an answer."'
    #шаблон для вопроса
    question = f"\n\nQuestion: {query}"
    #добавляем к сообщению для chatGPT релевантные строки из базы знаний, пока не выйдем за допустимое число токенов
    for string in strings:
        next_article = f'\n\nWikipedia article section:\n"""\n{string}\n"""'
        if (num_tokens(message + next_article + question, model=model) > token_budget):
            break
        else:
            message += next_article
    return message + question

Финальная функция для обращения к **chatGPT**:

In [None]:
def ask(
    query: str, #пользовательский запрос
    df: pd.DataFrame, #DataFrame со столбцами text и embedding (база знаний)
    model: str = GPT_MODEL, #модель
    token_budget: int = 4096 - 500, #ограничение на число отсылаемых токенов в модель
    print_message: bool = False, #нужно ли выводить сообщение перед отправкой
) -> str:

    #формируем сообщение к chatGPT (функция выше)
    message = query_message(query, df, model=model, token_budget=token_budget)
    #если параметр True, то выводим сообщение
    if print_message:
        print(message)
    messages = [
        {"role": "system", "content": "You answer questions about Academy Awards."},
        {"role": "user", "content": message},
    ]
    response = client.chat.completions.create(
        model=model,
        messages=messages,
        temperature=0 #гиперпараметр степени случайности при генерации текста. Влияет на то, как модель выбирает следующее слово в последовательности
    )
    response_message = response.choices[0].message.content
    return response_message

Реализуем все наши раннее созданные функции, создадим базу знаний, которую будем использовать для работы с **chatGPT**:

In [None]:
SAVE_PATH = "./academy_awards_knowledge_base.csv" #путь к файлу, где будет сохранена/загружена база знаний

#проверяем, существует ли уже сохраненная база знаний
if os.path.exists(SAVE_PATH):
    #если файл существует — загружаем его
    print(f"Загрузка базы знаний из файла: {SAVE_PATH}")
    df = pd.read_csv(SAVE_PATH)  #чтение CSV с помощью pandas
    df['embedding'] = df['embedding'].apply(ast.literal_eval) #преобразуем эмбеддинги из строк в списки чисел (при сохранении они хранились как строки)

    print(f"База знаний загружена: {len(df)} записей")
else:
    #если базы знаний нет — создаём новую
    print("Создание базы знаний...")

    site = mwclient.Site(WIKI_SITE) #подключаемся к Википедии через библиотеку mwclient
    category_page = site.pages[CATEGORY_TITLE] #получаем категорию "Academy Awards" (все статьи об Оскарах)
    titles = titles_from_category(category_page, max_depth=1) #собираем заголовки статей из категории (max_depth=1 — только первые подкатегории)
    print(f"Создано {len(titles)} заголовков статей в категории {CATEGORY_TITLE}.")

    #извлекаем все секции и подсекции из статей
    wikipedia_sections = []
    for title in titles:
        wikipedia_sections.extend(all_subsections_from_title(title))
    print(f"Найдено {len(wikipedia_sections)} секций на {len(titles)} страницах")

    wikipedia_sections = [clean_section(ws) for ws in wikipedia_sections] #очищаем секции от вики-разметки и ссылок вроде <ref>...</ref>

    #фильтруем секции: оставляем только те, у которых достаточно текста (длина > 16 символов)
    original_num_sections = len(wikipedia_sections)
    wikipedia_sections = [ws for ws in wikipedia_sections if keep_section(ws)]
    print(f"Отфильтровано {original_num_sections-len(wikipedia_sections)} секций, осталось {len(wikipedia_sections)} секций.")

    #разбиваем длинные секции на части по MAX_TOKENS (1600 токенов) для обработки GPT
    wikipedia_strings = []
    for section in wikipedia_sections:
        wikipedia_strings.extend(split_strings_from_subsection(section, max_tokens=MAX_TOKENS))
    print(f"{len(wikipedia_sections)} секций Википедии поделены на {len(wikipedia_strings)} строк.")

    #создаем DataFrame для хранения текстов и их эмбеддингов
    print("Создание эмбеддингов...")
    df = pd.DataFrame({"text": wikipedia_strings[:100]})  #берём первые 100 записей для экономии времени

    df['embedding'] = df.text.apply(lambda x: get_embedding(x, model=EMBEDDING_MODEL)) #генерируем эмбеддинги для каждого текста с помощью OpenAI API (model=text-embedding-ada-002)

    #сохраняем базу знаний в CSV для повторного использования
    df.to_csv(SAVE_PATH, index=False)
    print(f"База знаний сохранена в: {SAVE_PATH}")

Загрузка базы знаний из файла: ./academy_awards_knowledge_base.csv
База знаний загружена: 100 записей


Для информативности и ясности выведем датафрейм:

In [None]:
df.head()

Unnamed: 0,text,embedding
0,3rd Academy Awards\n\n{{Oscars short descripti...,"[-0.017022229731082916, -0.020548079162836075,..."
1,3rd Academy Awards\n\n== Winners and nominees=...,"[-0.011391614563763142, -0.007212481927126646,..."
2,3rd Academy Awards\n\n== Winners and nominees=...,"[-0.012653084471821785, -0.019328653812408447,..."
3,3rd Academy Awards\n\n== Multiple nominations ...,"[-0.015558500774204731, -0.007036137860268354,..."
4,Academy Awards\n\n{{Short description|Annual a...,"[-0.011331407353281975, 0.0011018485529348254,..."


Этот код реализует **Telegram-бота,** который отвечает на вопросы об Оскарах (Academy Awards), используя предварительно созданную базу знаний с эмбеддингами, асинхронную обработку сообщений и интеграцию с моделью **GPT** для генерации ответов:

In [None]:
bot = Bot(token=TELEGRAM_TOKEN)
dp = Dispatcher()

#информация о базе знаний
KNOWLEDGE_BASE_INFO = {
    "тематика": "Academy Awards (Оскары)",
    "число записей": len(df),
    "пример запроса": "Tell me about 3rd Academy Awards"
}

@dp.message(Command("start", "help"))
async def help_command(message: types.Message):
    help_text = (
        "🎬 Бот для вопросов об Academy Awards (Оскарах)\n"
        "ℹ️ Информация о базе знаний:\n"
        f"- Тематика: {KNOWLEDGE_BASE_INFO['тематика']}\n"
        f"- Число записей: {KNOWLEDGE_BASE_INFO['число записей']}\n"
        f"- Пример запроса: {KNOWLEDGE_BASE_INFO['пример запроса']}\n\n"
        "Просто задайте вопрос об Оскарах, и я постараюсь найти ответ в базе знаний!"
    )
    await message.answer(help_text)

@dp.message()
async def handle_query(message: types.Message):
    user_query = message.text.strip()

    if not user_query:
        await message.answer("Пожалуйста, введите ваш вопрос.")
        return

    try:
        await bot.send_chat_action(message.chat.id, "typing") #показываем индикатор набора текста

        #используем функцию ask для получения ответа
        response = ask(user_query, df=df)
        await message.answer(response)
    except Exception as e:
        error_msg = f"⚠️ Ошибка при обработке запроса: {str(e)}"
        #обрезаем сообщение об ошибке, если оно слишком длинное
        await message.answer(error_msg[:4000])


async def main():
    print(f"Бот запущен с токеном: {TELEGRAM_TOKEN[:5]}...{TELEGRAM_TOKEN[-5:]}")
    print(f"Размер базы знаний: {len(df)} записей")
    print("Ожидание сообщений...")
    await dp.start_polling(bot)

if __name__ == "__main__":
    try:
        loop = asyncio.get_event_loop()
        loop.run_until_complete(main())
    except KeyboardInterrupt:
        print("\nБот остановлен")
    except Exception as e:
        print(f"Критическая ошибка: {str(e)}")

Бот запущен с токеном: 79835...SjPlM
Размер базы знаний: 100 записей
Ожидание сообщений...


