In [1]:
import getpass
import os
import threading
import json
import re
from typing_extensions import List

from langchain_core.tools import tool
from langchain_core.documents import Document
from langchain_core.messages import SystemMessage, HumanMessage

from langchain_text_splitters import RecursiveCharacterTextSplitter

from langgraph.graph import MessagesState, StateGraph, END
from langgraph.prebuilt import ToolNode, tools_condition
from langgraph.checkpoint.memory import MemorySaver

from langchain_openai.chat_models import ChatOpenAI

from langchain.embeddings import HuggingFaceEmbeddings

from langchain_community.tools import DuckDuckGoSearchResults
from langchain_core.vectorstores import InMemoryVectorStore

import requests

from sklearn.feature_extraction.text import TfidfVectorizer

import numpy as np

import web_scrapers
import constants
import api

import yaml
from box import Box

In [None]:
with open('config.yaml', 'r') as f:
    config_dict = yaml.safe_load(f)
config = Box(config_dict)

In [3]:
if not os.environ.get('OPENROUTER_API_KEY'):
  os.environ['OPENROUTER_API_KEY'] = getpass.getpass('Enter API key for OpenRouter: ')

if not os.environ.get('OPENROUTER_BASE_URL'):
  os.environ['OPENROUTER_BASE_URL'] = 'https://openrouter.ai/api/v1'

Enter API key for OpenRouter: ··········


В качестве LLM выбрана gpt-4.1-nano. Дешевая, быстрая модель, поддерживающая тулзы - это будет важно позже

In [5]:
llm = ChatOpenAI(
  openai_api_key=os.environ.get('OPENROUTER_API_KEY'),
  openai_api_base=os.environ.get('OPENROUTER_BASE_URL'),
  model_name=config.model.name,
  temperature=config.model.temperature
)

Создаем локальное векторное хранилище, к нему подключаем эмбеддинги E5 (теперь от HuggingFace, ведь, как оказалось, Fastembeddings под капотом меняли E5 на другую модель). По идее, E5 должна хорошо подходить для смеси русского текста и английской терминологии, а еще показывает неплохие показатели.

In [6]:
%%capture
embeddings = HuggingFaceEmbeddings(model_name=config.embeddings.model_name)
vector_store = InMemoryVectorStore(embedding=embeddings)

**UPD**: Используем crawler от Apify, у него есть интеграция с Langchain. Он пробегается по сайту от главной страницы вглубь и сохраняет информацию со всех страниц.

Начинаем с главной страницы сайта, идем до глубины 3, стараемся преобразовывать html в читаемый текст, убираем порог у readableText, чтобы не пропускать даже мелкие вставки. Используем firefox браузер из playwright, чтобы читать больше информации с сайта - на нем куча javascript'а, который Crawl4AI (используемый ранее), обработать не мог.



---


**Важно:**

Для большей воспроизводимости и упрощения работы с этим блокнотом я вынес все, что связано с использованием Apify в отдельную ячейку.

Вместо ввода API-токена и ожидания краулера можно запустить соседнюю ячейку, чтобы подгрузить те же данные, посчитанные мной заранее. Они хранятся у них на сервере и не требуют API-ключа для получения.

In [None]:
from langchain_apify import ApifyWrapper


if not os.environ.get('APIFY_API_TOKEN'):
  os.environ['APIFY_API_TOKEN'] = getpass.getpass('Enter API token for Apify: ')

apify = ApifyWrapper()

loader = apify.call_actor(
    actor_id='apify/website-content-crawler',
    run_input={
        'startUrls': [
            {'url': 'https://www.neoflex.ru/'}
            ],
        'maxCrawlPages': config.apify.max_pages,
        'maxCrawlDepth': config.apify.max_depth,
        'htmlTransformer': 'readableTextIfPossible',
        'readableTextCharThreshold': config.apify.threshold,
        'crawlerType': 'playwright:firefox'
        },
    dataset_mapping_function=lambda item: Document(
        page_content=item['text'] or '', metadata={'source': item['url']}
    ),
)

docs = loader.load()

In [7]:
# Для подгрузки без API токена
data = requests.get('https://api.apify.com/v2/datasets/oDw3TPSSsJ2dZ4ayz/items?clean=true&format=json')

docs = []
for item in data.json():
    docs.append(
        Document(
            page_content=item['text'] or '', metadata={'source': item['url']}
        )
    )

Делим полученные документы на чанки рекурсивным сплиттером:

In [8]:
text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=config.documents.chunk_size,
    chunk_overlap=config.documents.chunk_overlap
)

all_splits = text_splitter.split_documents(docs)

Теперь займемся очищением документов от мусора.

Пишем функцию для очистки документов от навигационных артефактов, не несущих смысла:

In [9]:
def clean_navigation_artifacts(text: str):
    lines = text.splitlines()
    cleaned_lines = []
    for line in lines:
        line = line.strip()

        if not line:
            continue
        if re.fullmatch(r'20\d{2}', line):
            continue
        if re.fullmatch(r'\d{1,2}', line):
            continue

        if line.lower() in [
            'previous', 'next', 'поделиться', 'отправить на e-mail', 'узнать'
            'пресс-центр', 'новости', 'сми о нас', 'показать еще', '...',
            'подписаться на новости', 'отправить', 'поделитьсяотправить на e-mail'
        ]:
            continue

        cleaned_lines.append(line)

    return '\n'.join(cleaned_lines)

Прогоняем через нее все полученные чанки:

In [10]:
for split in all_splits:
    split.page_content = clean_navigation_artifacts(split.page_content)

Прогоняем полученные чанки через TF-IDF фильтрацию, чтобы отсеять воду и мусор. В нашем случае удалим 30% документов, худшие по TF-IDF score:

In [11]:
texts = [doc.page_content for doc in all_splits]

vectorizer = TfidfVectorizer()
tfidf_matrix = vectorizer.fit_transform(texts)

scores = tfidf_matrix.sum(axis=1)
scores = np.array(scores).flatten()

threshold = np.percentile(scores, config.tfidf.threshold_percentile)

filtered_docs = [
    doc for doc, score in zip(all_splits, scores)
    if score > threshold
]

Обогащаем полученный набор документов информацией, которую crawler от Apify не смог достать с сайта - контактами компании в разных городах, информацией о клиентах Neoflex и имейлом отдела кадров.

О контактах и клиентах компании:

- Контакты разных офисов и клиенты компании подгружаются динамически без перехода на новые страницы. Crawler не умеет работать с такими элементами
- Я пытался найти API-запрос, по которому подгружается нужная информация, чтобы обогатить документы из него, но, как оказалось, все адреса захардкожены в обычный массив внутри vue.js скрипта, а информация о клиентах лежит частично хардкодом в DOM, а частично где-то во vue.js коде
- Вместо попыток достать информацию с бэкенда я решил написать скрэппер для этих страниц, который будет прокликивать кнопки и собирать информацию вручную. Да, это долго, но это работает

Об имейле отдела кадров:

- Как и с адресами офисов, при тестах выяснилось, что нет документов о подходящем для отправки резюме имейле.
- Эта информация был отсеяна crawler'ом на этапе преобразования html в текст, так как он счел ее нерелевантной
- Также достанем ее отдельным маленьким скрэппером

Собирать информацию будем скриптами на playwright из файла web_scrapers:

Применяем скрэпперы и заливаем информацию из них в общий массив документов:

In [13]:
city_data = await web_scrapers.scrape_city_addresses()

city_docs = [
    Document(
        metadata={'source': constants.CONTACTS_URL},
        page_content=f'Контакты офисов компании в городе {name} (адрес, электронная почта, телефон): {data}'
    )
    for name, data in city_data.items()
]

customer_data = await web_scrapers.scrape_customer_details()
customer_docs = [
    Document(
        metadata={'source': constants.CUSTOMERS_URL},
        page_content=f'Информация об одном из клиентов (заказчиков) компании Neoflex: {data}'
    )
    for data in customer_data
]

career_doc = Document(
    metadata={'source': constants.CAREER_URL},
    page_content=(await web_scrapers.scrape_career_details())
)

filtered_docs += city_docs
filtered_docs += customer_docs
filtered_docs.append(career_doc)

Теперь добавим теги ко всем документам для будущего поиска по ним. Ориентироваться будем на url страницы, породившей документ.

In [14]:
tags = {
    'Решения': '/solutions',
    'Кейсы': '/project-list',
    'Экспертизы': '/expertises',
    'О компании': '/about',
    'Партнеры': '/about/partners',
    'Клиенты': '/about/customers',
    'Карьера': '/about/career',
    'Пресс-центр': '/press-center',
    'Контакты': '/contacts',
}

for doc in filtered_docs:
    matched_tags = [
        tag for tag, snippet in tags.items()
        if snippet and snippet in doc.metadata.get('source', '')
    ]
    doc.metadata.setdefault('tags', []).extend(matched_tags)

Кроме этих тегов в теории можно добавить, например, теги городов ("Саратов", "Москва" и т.д.), сфер деятельности ("MLops", "Мобильная разработка") или компаний ("Сбер", "Россельхоз" и т.д.) парсингом page_content докуменов или прогнав их содержимое через LLM, подбирающее теги (такой подход будет использован для извлечения тегов из пользовательских запросов далее)

Наконец, добавляем обязательный префикс для E5:

In [15]:
for doc in filtered_docs:
    doc.page_content = f'passage: {doc.page_content}'

Выведем 5 рандомных чанков:

In [18]:
from random import randrange

for _ in range(5):
    idx = randrange(len(filtered_docs)-1)
    print(f'metadata: {filtered_docs[idx].metadata}')
    print(f'{filtered_docs[idx].page_content}\n\n')

metadata: {'source': 'https://www.neoflex.ru/projects/tsifrovye-bankovskie-produkty-i-servisy-dlya-msb', 'tags': []}
passage: Клиентам банка также доступны различные нефинансовые сервисы в электронном формате: регистрация бизнеса, онлайн-бухгалтерия, проверка контрагентов, привлечение клиентов, налоговое консультирование и юридическая поддержка. В ближайших планах – внедрение кредитного продукта в форме овердрафта.
Отзыв заказчика
Анатолий Медведев
Начальник управления малого и среднего бизнеса «Ренессанс Банка»
Предприниматели в «Ренессанс Банке» обслуживаются исключительно в цифровых каналах. Точка входа – сайт, там простая форма заявки на открытие счета. Развивая это направление бизнеса, мы сознательно идем по цифровому сценарию, потому что это отвечает запросам современных предпринимателей. Мы видим своей первостепенной задачей сделать дистанционное обслуживание максимально комфортным, надежным и удобным.
Новости и СМИ о проекте
«Ренессанс Банк» при поддержке Neoflex запустил цифро

Загружаем все внутрь VectorStore:

In [19]:
document_ids = vector_store.add_documents(documents=filtered_docs)

Пишем функцию get_query_tags: она будет обращаться к LLM, чтобы извлечь подходящие теги из пользовательского запроса.

In [None]:
def load_prompt(path):
    with open(path, "r", encoding="utf-8") as f:
        return f.read()

In [22]:
def get_query_tags(user_query: str):
    prompt = load_prompt('prompts/tag-getter-prompt.txt')
    f_prompt = prompt.format(user_query=user_query)
    response = llm.invoke(prompt)

    try:
        tags = eval(response.content.strip())
    except:
        tags = []

    return tags

Пишем функцию reformulate_query, которая обращается к LLM вне контекста, чтобы та переформулировала запрос.

In [23]:
def reformulate_query(user_query: str):
    user_query = user_query[:config.reformulate.max_length]

    prompt = [
        SystemMessage(content=(
            load_prompt('prompts/reformulation-prompt.txt')
        )),
        HumanMessage(content=user_query)
    ]
    response = llm.invoke(prompt)
    try:
        return response.content.strip() or user_query
    except:
        return user_query

Пишем функцию normalize_similarity_score. Она позволит удобнее и интуитивнее работать со скорами семантического поиска, нормализуя их на интервал [0, 1]. Это сильно удобнее, чем компактный интервал [0.7, 1], в котором по умолчанию лежат скоры для E5

In [24]:
def normalize_similarity_score(scored_docs, lo, hi):
    result = [
        (doc, (score - lo) / (hi - lo))
        for doc, score in scored_docs
    ]
    return result

Пишем функцию soft_merge. Ее задача - "мягко" объединить два набора документов. Удаляем из наборов все документы, у которых скор хуже среднего, а потом соединяем наборы вместе, сортируем и возвращаем K лучших документов, где K - длина наибольшего набора.

Если какие-то документы окажутся настолько релевантными, что перетянут в свою сторону средний скор, то функция может вернуть меньше, чем K документов. Это поведение ожидаемо - идея в том, что вылетевшие документы в таком случае не будут мешать модели работать с самыми релевантыми.

In [25]:
def soft_merge(lhs_docs, rhs_docs):
    res_len = max(len(lhs_docs), len(rhs_docs))

    lhs_docs = normalize_similarity_score(
        lhs_docs, config.embeddings.score_lo, config.embeddings.score_hi)
    rhs_docs = normalize_similarity_score(
        rhs_docs, config.embeddings.score_lo, config.embeddings.score_hi)

    lhs_mean = sum([score for _, score in lhs_docs]) / len(lhs_docs) if lhs_docs else 0
    rhs_mean = sum([score for _, score in rhs_docs]) / len(rhs_docs) if rhs_docs else 0
    lhs_docs = [(doc, score) for doc, score in lhs_docs if score >= lhs_mean]
    rhs_docs = [(doc, score) for doc, score in rhs_docs if score >= rhs_mean]

    docs = sorted(lhs_docs+rhs_docs, key=lambda x: x[1], reverse=True)
    unique_docs = {doc.page_content: (doc, score) for doc, score in docs}.values()
    unique_docs = list(unique_docs)

    return unique_docs[:res_len]

Объявляем функцию rerank_by_tags. Она принимает на вход массив документов, сравнивает их теги с целевыми и подтягивает скор докуметов в зависимости от количества совпадений. Возвращает отсортированный по скорам массив документов. При filter_irrelevant = True отсекает явный мусор.

Затем, наконец, пишем функцию retrieve_from_local. Она, используя все объявленные ранее функции для поиска и ранжирования документов, ищет релевантную информацию в локальном хранилище:

In [26]:
def rerank_by_tags(
    docs, target_tags, boost=config.rerank.boost,
    filter_irrelevant=False, threshold=config.rerank.threshold
):
    reranked = []

    if not target_tags:
        reranked = docs
    else:
        target_tags = {t.lower() for t in target_tags if isinstance(t, str)}
        for doc, score in docs:
            doc_tags = set(doc.metadata.get('tags', []))
            doc_tags_lo = {t.lower() for t in doc_tags}

            matches = len(doc_tags_lo & target_tags)
            adjusted_score = max(0.0, score + boost * matches)
            reranked.append((doc, adjusted_score))

    reranked_normalized = normalize_similarity_score(
        reranked, config.embeddings.score_lo, config.embeddings.score_hi)
    reranked_normalized.sort(key=lambda x: x[1], reverse=True)
    reranked.sort(key=lambda x: x[1], reverse=True)

    if filter_irrelevant:
        reranked = [
            (doc, score)
            for doc, score in reranked if score > threshold
            ]

    return reranked # Invalid return, reranked_normilized is being forgotten


@tool(response_format='content_and_artifact')
def retrieve_from_local(query: str):
    '''
    Используй этот инструмент для поиска актуальной, специфической информации
    о компании Neoflex в локальной базе знаний.

    Вызывай этот инструмент для вопросов о:
    - Адресах, местоположении офисов, контактных данных
    - Проектах, решениях, заказчиках, технологиях, платформах, партнерах
    - Конкретных фактах, датах, названиях, именах, ссылках и т.д.
    - Любых других специфических деталях операций или структуры Neoflex.

    Отдавай приоритет использованию этого инструмента перед своими внутренними знаниями
    для проверки специфических фактов, чтобы гарантировать точность.

    Attributes:
    query (str): строковый запрос на естественном языке,
        должен содержать ключевые слова для поиска.
    '''

    tags_set = set(tags.keys())
    found_tags = set.intersection(set(get_query_tags(query)), tags_set)

    r_query = reformulate_query(query)

    r_retrieved_docs = vector_store.similarity_search_with_score(
        f'query: {r_query}',
        k=config.semantic_search.k
    )
    retrieved_docs = vector_store.similarity_search_with_score(
        f'query: {query}',
        k=config.semantic_search.k
    )

    docs = soft_merge(retrieved_docs, r_retrieved_docs)
    reranked_docs = rerank_by_tags(docs, found_tags)

    serialized = '\n\n'.join(
        (f'Источник: {doc.metadata.get("url", "Неизвестный источник")}\n' f'Содержимое документа: {doc.page_content}')
        for doc, _ in reranked_docs
    )
    return serialized, reranked_docs

web_search = DuckDuckGoSearchResults()

Создаем будущие ноды графа:

Первая будет отвечать за поиск подходящей информации либо прямой ответ (для случаев, когда разговор идет не по теме)

Вторая - это ToolNode, к которому будет происходить обращение из первой ноды по необходимости

Третья - гененирует итоговый ответ по результатам обращения к тулзам

In [27]:
class State(MessagesState):
    context: List[Document]


def query_or_respond(state: MessagesState):
    prompt = [SystemMessage(content=load_prompt('prompts/qr-prompt')] + state['messages']
    llm_with_tools = llm.bind_tools([retrieve_from_local, web_search])
    response = llm_with_tools.invoke(prompt)
    return {'messages': [response]}


tools = ToolNode([retrieve_from_local, web_search])


def generate(state: MessagesState):
    recent_tool_msgs = []
    for message in reversed(state['messages']):
        if message.type == 'tool':
            recent_tool_msgs.append(message)
        else:
            break

    tool_msgs = recent_tool_msgs[::-1]

    docs_content = '\n\n'.join(
        doc.page_content
        for tool_msg in tool_msgs
        if tool_msg.artifact
        for doc, _ in tool_msg.artifact
    )
    if not docs_content:
        docs_content = 'Контекст отсутствует'
    system_message_content = (
        f"{load_prompt('prompts/generate-prompt.txt')}"
        f'{docs_content}'
    )
    conversation_msgs = [
        message
        for message in state['messages']
        if message.type in ('human', 'system')
        or (message.type == 'ai' and not message.tool_calls)
    ]
    prompt = [SystemMessage(system_message_content)] + conversation_msgs
    # may be a good idea to add prefixes to previous msgs or some introduction line

    response = llm.invoke(prompt)
    context = []

    for tool_msg in tool_msgs:
        if tool_msg.artifact is not None:
            context.extend(tool_msg.artifact)

    return {'messages': [response], 'context': context}

Создаем граф, добавляем в него ноды, настраиваем ребра, добавляем ему память, чтобы RAG помнил контекст разговора, собираем все в кучу

In [28]:
graph_builder = StateGraph(State)

graph_builder.add_node(query_or_respond)
graph_builder.add_node(tools)
graph_builder.add_node(generate)

graph_builder.set_entry_point('query_or_respond')
graph_builder.add_conditional_edges(
    'query_or_respond',
    tools_condition,
    {END: END, 'tools': 'tools'}
)

graph_builder.add_edge('tools', 'generate')
graph_builder.add_edge('generate', END)

memory = MemorySaver()

graph = graph_builder.compile(checkpointer=memory)

config = {'configurable': {'thread_id': 'abc123'}}

Далее пишем функцию, обрабатывающую входящее сообщение и дающую на него ответ в требуемом по ТЗ формате. Возвращаем источники, как для поика по локальной базе, так и для поиска с помощью DuckDuckGo:

In [29]:
def process_input_message(session_id: str, input_message: str):
    response = graph.invoke(
        {'messages': [{'role': 'user', 'content': input_message}], 'context': []},
        stream_mode='values',
        config=config
    )

    src = []
    if response.get('context') and len(response['context']) > 0:
        for doc in response['context']:
            if isinstance(doc, dict):
                src.append({
                    'source': doc['link'],
                    'snippet': doc['snippet']
                })
            if isinstance(doc, tuple):
                src.append({
                    'source': doc[0].metadata.get('source', 'unknown'),
                    'snippet': f'relevance: {doc[1]} ' + doc[0].page_content[:150] + '...'
                    # добавил скор в вывод для наглядности
                })
            else: src.append({
                'source': 'unknown',
                'snippet': f'Были получены неожиданные результаты поиска: {type(doc)}'
                })

    return {
        'answer': response['messages'][-1].content
        if response.get('messages') else "Ответ не получен.",
        'source_documents': src,
        'session_id': session_id
    }

Создаем инстанс FastAPI и три класса, наследующих от pydantic BaseModel - с их помощью будем следить за структурой генерируемых запросов и ответов от API.

Пишем функцию ask_question с декоратором app.post('/ask, ...). С его помощью будем ловить POST запросы к API и обрабатывать их, возвращая ответ.

В отдельном потоке запускаем локальный сервер на uvicorn'е с нашей API-шкой

In [30]:
api.set_process_function(process_input_message)

Запускаем локальный сервер на uvicorn'е, пишем будущий json запрос, передаем его POST запросом на сервер, получаем ответ

In [32]:
query = {
    'session_id': 'abc123',
    'question': 'В каких областях Neoflex обладает экспертизой?'
}

response = requests.post('http://127.0.0.1:8000/ask', json=query)
print(json.dumps(response.json(), indent=4, ensure_ascii=False))

{
    "answer": "Neoflex обладает экспертизой в области Site Reliability Engineering, DevOps, разработки Data-платформ, MLOps, трансформации приложений и инструментов, а также в создании платформ для обработки и хранения данных.",
    "source_documents": [
        {
            "source": "https://www.neoflex.ru/expertises/sre",
            "snippet": "relevance: 0.6709694879472464 passage: Neoflex — Экспертиза — Site Reliability Engineering\nDevOps\nSite Reliability Engineering\nРазработка Data-платформ\nТрансформация приложений, ин..."
        },
        {
            "source": "https://www.neoflex.ru/expertises/big-data",
            "snippet": "relevance: 0.6659197877406068 passage: Neoflex — Экспертиза — Разработка Data-платформ\nSite Reliability Engineering\nРазработка Data-платформ\nMLOps\nСоздаем платформы для обработки и..."
        }
    ],
    "session_id": "abc123"
}
