In [12]:
import sys
sys.path.append(r'C:\Users\Erik\Documents\VSCode\PC-Build.AI')

In [None]:
from langgraph.prebuilt import tools_condition
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver
from typing import Literal
# from src.utils.utilities import create_entry_node, create_tool_node_with_fallback
from src.agent_shema.build_assistants import Assistant, ToPCBuildAssistant, ToPriceValidationCheckerAssistant
from src.agent_shema.agent_runnables import AIAgentRunnables
from src.agent_shema.complete_or_escalate import CompleteOrEscalate
from src.agent_shema.build_agent_state import State
from langchain_core.messages import ToolMessage
from load_config import LoadConfig
CFG = LoadConfig()
AGENT_RUNNABLES = AIAgentRunnables()

In [None]:
from src.tools.bottle_neck import calculate_bottleneck
from src.tools.game_runner import game_run_tool
from src.tools.regard_parser import regard_parser_tool
from src.tools.sql_agent_tools import pc_builder_tool, question_answer_tool, SQLAgent


In [7]:
from pyprojroot import here
from pydantic import conint, field_validator
from sqlalchemy import text, create_engine
db_path = str(here("")) + "\\pc_accessories_2.db"
db_path = f"sqlite:///{db_path}"
db_path

engine = create_engine(db_path)

In [8]:
from typing import Annotated, Literal, Optional
from typing_extensions import TypedDict
from langgraph.graph.message import AnyMessage, add_messages


def update_dialog_stack(left: list[str], right: Optional[str]) -> list[str]:
    """
    Push or pop the state: Updates the dialog stack by either adding a new state or removing the last state.

    Args:
        left (list[str]): The current state of the dialog stack, represented as a list of strings.
        right (Optional[str]): The operation to perform. If `right` is None, the function returns the current state.
                               If `right` is "pop", the last element of the stack is removed. Otherwise, `right` is 
                               appended to the stack.

    Returns:
        list[str]: The updated dialog stack.
    """
    if right is None:
        return left
    if right == "pop":
        return left[:-1]
    return left + [right]


In [9]:
from typing import Literal, Annotated, List, Optional, Callable, Dict, Any
from typing_extensions import TypedDict
from langchain_core.messages import BaseMessage, ToolMessage
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver
from langgraph.prebuilt import tools_condition
from typing import Literal, Annotated, List
from typing_extensions import TypedDict
from langchain_core.messages import BaseMessage
from langgraph.graph.message import add_messages
import os

In [None]:
from langchain_core.messages import ToolMessage
from langchain_core.runnables import RunnableLambda
from langgraph.prebuilt import ToolNode
from typing import Literal, Optional, Annotated
from langchain_core.runnables import Runnable, RunnableConfig
from langchain_core.messages import ToolMessage, AIMessage, BaseMessage
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver
from langgraph.prebuilt import tools_condition
from typing_extensions import TypedDict
from langchain_core.prompts import ChatPromptTemplate


class State(TypedDict):
    messages: Annotated[list[BaseMessage], add_messages]
    dialog_state: Annotated[
        list[Literal["assistant", "build_pc", "validate_price"]],
        update_dialog_stack,
    ]
    
    
def handle_tool_error(state) -> dict:
    """
    Handles errors by formatting them into a message and adding them to the chat history.

    This function retrieves the error from the given state and formats it into a `ToolMessage`, which is then
    added to the chat history. It uses the latest tool calls from the state to attach the error message.

    Args:
        state (dict): The current state of the tool, which includes error information and tool calls.

    Returns:
        dict: A dictionary containing a list of `ToolMessage` objects with error information.
    """
    error = state.get("error")
    tool_calls = state["messages"][-1].tool_calls
    return {
        "messages": [
            ToolMessage(
                content=f"Error: {repr(error)}\n please fix your mistakes.",
                tool_call_id=tc["id"],
            )
            for tc in tool_calls
        ]
    }


def create_tool_node_with_fallback(tools: list) -> dict:
    """
    Creates a `ToolNode` with fallback error handling.

    This function creates a `ToolNode` object and configures it to use a fallback function for error handling. 
    The fallback function handles errors by calling `handle_tool_error`.

    Args:
        tools (list): A list of tools to be included in the `ToolNode`.

    Returns:
        dict: A `ToolNode` configured with fallback error handling.
    """
    return ToolNode(tools).with_fallbacks(
        [RunnableLambda(handle_tool_error)], exception_key="error"
    )
    
def create_entry_node(assistant_name: str, new_dialog_state: str) -> Callable:
    """
    Создает функцию для перехода в новый этап диалога с указанием состояния и инструмента.

    Аргументы:
        assistant_name (str): Название помощника, который будет использоваться в сообщении.
        new_dialog_state (str): Новое состояние диалога после перехода.

    Возвращает:
        Callable: Функция, которая при вызове с объектом `State` возвращает словарь с информацией об инструменте и обновленным состоянием диалога.

    Функция выполняет следующие действия:
        - Извлекает `tool_call_id` из последнего сообщения с первым вызовом инструмента в объекте `State`.
        - Формирует сообщение для инструмента, информируя пользователя о том, что сейчас активен указанный помощник.
        - Обновляет состояние диалога с учетом нового состояния.
        - Сообщение инструмента информирует, что задача не завершена, пока не будет успешно вызван необходимый инструмент.
        - Если пользователь изменит свое решение или потребуется помощь по другим вопросам, сообщение предлагает вызвать функцию `CompleteOrEscalate`, чтобы вернуть управление основному ассистенту.
    """
    def entry_node(state: State) -> dict:
        tool_call_id = state["messages"][-1].tool_calls[0]["id"]
        return {
            "messages": [
                ToolMessage(
                    content=(
                        f"Ассистент {assistant_name} в настоящее время активен. Пожалуйста, просмотрите предыдущую переписку "
                        f"между основным ассистентом и пользователем. Цель пользователя еще не достигнута. "
                        f"Используйте предоставленные инструменты для выполнения задачи. Помните, вы – {assistant_name}, и действие "
                        "не будет считаться завершенным, пока необходимый инструмент не будет успешно вызван. "
                        "Если пользователь изменит свое решение или потребуется помощь по другим вопросам, вызовите функцию CompleteOrEscalate, "
                        "чтобы вернуть управление основному ассистенту."
                    ),
                    tool_call_id=tool_call_id,
                )
            ],
            "dialog_state": new_dialog_state,
        }

    return entry_node



primary_assistant_prompt = ChatPromptTemplate.from_messages(
    [
        (
            "system",
            "Вы являетесь основным ассистентом, ответственным за маршрутизацию пользовательских запросов, связанных со сборкой ПК, проверкой совместимости компонентов, валидацией цен и оценкой производительности. "
            "При получении запроса следуйте следующим правилам маршрутизации:\n\n"
            "1. Если запрос общий, связан со сборкой ПК или выбором компонентов, то направляйте его к помощнику по сборке ПК (pc build assistant).\n\n"
            "2. Если запрос касается вопросов совместимости компонентов, выбора или сборки ПК, также направляйте его к помощнику по сборке ПК.\n\n"
            "3. Если запрос связан с проверкой актуальных цен, валидацией цен или изменениями цен, направляйте его к помощнику по валидации цен (Price Validation Assistant).\n\n"
            "4. Если запрос касается определения узких мест (например, выяснения, насколько процессор ограничивает работу видеокарты) или проверки того, сможет ли игра работать корректно, направляйте его к помощнику по валидации цен.\n\n"
            "Не упоминайте названия специализированных ассистентов пользователю – делегируйте задачи тихо через вызовы функций. "
        ),
        ("placeholder", "{messages}"),
    ]
)
pc_info_prompt = ChatPromptTemplate.from_messages(
    [
        (
            "system",
            "Вы – специализированный ассистент, сосредоточенный на предоставлении подробной информации о компьютерных компонентах и сборке ПК. "
            "Всякий раз, когда основной ассистент нуждается в помощи с поиском или проверкой информации о компонентах (например, процессоры, видеокарты, оперативная память и т.д.), задача делегируется вам. "
            "Ваша задача – отвечать на вопросы о характеристиках и совместимости компонентов ПК. Вы можете предоставить детальную информацию о процессорах, видеокартах, материнских платах, памяти и других компонентах. "
            "Кроме того, вы способны помочь пользователю собрать ПК, учитывая его требования, бюджет и предполагаемое использование (например, для игр, офисной работы, стриминга и т.д.). "
            "Если пользователь просит рекомендацию или нуждается в помощи по определению совместимости компонентов, вы должны использовать доступные инструменты для сбора необходимой информации. "
            "Ваша цель – давать четкие, точные и полезные ответы пользователю и помогать в сборке функционального и сбалансированного ПК.\n\n"
            "Некоторые примеры вопросов, на которые вы можете ответить:\n"
            "- 'Каково энергопотребление процессора Intel Core i9-12900K?'\n"
            "- 'Можно ли использовать 64 ГБ оперативной памяти с материнской платой MSI B550?'\n"
            "- 'Сколько стоит AMD Ryzen 5 5600X?'\n"
            "- 'Какова производительность RTX 3070 в играх с разрешением 4K?'\n"
            "- 'Можете помочь собрать игровой ПК с бюджетом в $1500?'\n\n"
            "Если пользователь не уверен в выборе компонентов для сборки, обратитесь к соответствующим инструментам для подбора деталей с учетом его бюджета и потребностей. "
            "Если запрос неясен или вы не можете найти необходимую информацию, эскалируйте задачу основному ассистенту.\n\n"
            "Если пользователь запрашивает сборку ПК, но не указывает бюджет или тип использования, эскалируйте задачу обратно основному ассистенту."
        ),
        ("placeholder", "{messages}"),
    ]
)
        

price_validation_checker_prompt = ChatPromptTemplate.from_messages(
    [
        (
            "system",
            "Вы – ассистент 'validate_price', отвечающий за задачи, связанные с ценообразованием компонентов, проверкой совместимости игр и анализом узких мест. "
            "Ваши задачи включают следующее:\n"
            "1. **Поиск лучших цен** на компоненты (например, CPU, GPU, RAM и т.д.) с использованием инструмента `regard_parser_tool`.\n"
            "2. **Проверка совместимости игр** с использованием инструмента `game_run_tool`. Вам будет предоставлено название игры и выбранные пользователем процессор, видеокарта и объем оперативной памяти, чтобы определить, соответствуют ли эти компоненты системным требованиям игры.\n"
            "3. **Анализ узких мест между CPU и GPU** с использованием инструмента `calculate_bottleneck`. Это помогает определить, будет ли выбранное сочетание процессора и видеокарты работать оптимально.\n"
            "4. **Обработка как структурированного, так и неструктурированного ввода:** Вы можете получить данные в виде структурированного JSON или текстового запроса (например, 'Мне нужен ПК для стриминга'). Если ввод является строкой, извлеките соответствующие компоненты и обработайте их как структурированные данные.\n"
            "\n\n### Инструменты, которые будут использоваться:\n"
            "- **`regard_parser_tool`**: Используйте этот инструмент для поиска лучших цен на указанные компоненты (например, CPU, GPU, RAM) и возврата соответствующих ценовых данных.\n"
            "- **`game_run_tool`**: Используйте этот инструмент для проверки, соответствуют ли указанные CPU, GPU и RAM системным требованиям для конкретной игры.\n"
            "- **`calculate_bottleneck`**: Используйте этот инструмент для проверки, приведет ли сочетание CPU и GPU к ограничению производительности (узкому месту).\n"
            "\n\n### Шаги для обработки запросов:\n"
            "1. Если ввод структурирован (в формате JSON), извлеките соответствующие компоненты и обработайте их с помощью соответствующих инструментов.\n"
            "2. Если ввод является неструктурированной строкой, определите упомянутые компоненты (например, 'видеокарта RTX 3070', 'процессор Intel Core i7') и обработайте их как структурированные данные.\n"
            "3. Для анализа узких мест, если предоставлены и процессор, и видеокарта, используйте `calculate_bottleneck`, чтобы проверить, не ограничивает ли процессор производительность видеокарты.\n"
            "4. Если информация неполная или отсутствует, сообщите об этом пользователю и запросите дополнительные детали для продолжения.\n"
            "\n### Примеры задач:\n"
            "- **Поиск цен**: 'Какова цена процессора Intel Core i7-12700 и видеокарты GeForce RTX 3070?'\n"
            "- **Проверка совместимости игр**: 'Запустится ли моя система (Intel Core i7, GeForce RTX 3070, 16GB RAM) Cyberpunk 2077?'\n"
            "- **Проверка узкого места между CPU и GPU**: 'Можно ли сочетать Intel Core i7-12700 с GeForce RTX 3070 без узкого места?'\n"
            "- **Неструктурированные запросы**: 'Мне нужен ПК для стриминга с видеокартой Nvidia, 32GB RAM и блоком питания 850W.'"
        ),
        ("placeholder", "{messages}"),
    ]
)



primary_assistant_tools = [
    ToPCBuildAssistant,
    ToPriceValidationCheckerAssistant,
]
primary_assistant_runnable = primary_assistant_prompt | CFG.llm.bind_tools(
    primary_assistant_tools + [CompleteOrEscalate]
)

pc_build_tools = [pc_builder_tool, question_answer_tool]
pc_build_runnable = pc_info_prompt | CFG.llm.bind_tools(
    pc_build_tools + [CompleteOrEscalate]
)

price_validation_checker_tools = [regard_parser_tool, calculate_bottleneck, game_run_tool]
price_validation_checker_runnable = price_validation_checker_prompt | CFG.llm.bind_tools(
    price_validation_checker_tools + [CompleteOrEscalate]
)


def _print_event(event: dict, _printed: set, max_length=1500):
    """
    Выводит текущее состояние и сообщения события, с возможностью обрезки длинных сообщений.

    Эта функция выводит информацию о текущем состоянии диалога и последнем сообщении в событии. Если сообщение слишком длинное, 
    оно будет обрезано до указанной максимальной длины. В случае повторных сообщений с одинаковым ID, они не будут повторно выведены.

    Логирование включает информацию о:
    - Текущем состоянии диалога.
    - Последнем сообщении.
    - Параметрах обрезки длинных сообщений.
    - Избежании дублирования сообщений через их уникальные ID.

    Args:
        event (dict): Словарь, содержащий состояние диалога и сообщения.
        _printed (set): Множество ID сообщений, которые уже были выведены, для предотвращения дублирования.
        max_length (int, optional): Максимальная длина сообщения до его обрезки. По умолчанию — 1500 символов.

    Логирование:
        1. Если состояние диалога доступно, будет выведено текущее состояние (последний элемент в списке состояний).
        2. Если сообщение доступно, проверяется его длина:
            - Если сообщение слишком длинное, оно обрезается с добавлением текста "(truncated)".
            - Логируется только первое сообщение с уникальным ID, чтобы избежать дублирования.
        3. Вся информация, включая состояния и сообщения, записывается в консоль или в журнал (в зависимости от настройки логирования).
    """
    # Логирование состояния диалога
    current_state = event.get("dialog_state")
    if current_state:
        state_info = current_state[-1]
        print(f"[INFO] Current dialog state: {state_info}")
    
    # Логирование сообщений
    message = event.get("messages")
    if message:
        if isinstance(message, list):
            message = message[-1]  # Получаем последнее сообщение из списка сообщений
        
        if message.id not in _printed:
            # Ожидаем, что у сообщения будет метод pretty_repr для корректного представления
            msg_repr = message.pretty_repr(html=True)  # Выводим сообщение в читаемом формате (с HTML разметкой)
            
            # Логируем длину сообщения перед обрезкой
            if len(msg_repr) > max_length:
                print(f"[WARNING] Message is too long ({len(msg_repr)} characters), truncating to {max_length} characters.")
                msg_repr = msg_repr[:max_length] + " ... (truncated)"
            
            # Выводим сообщение в консоль
            print(f"[INFO] Message content:\n{msg_repr}")
            
            # Добавляем ID сообщения в список выведенных
            _printed.add(message.id)
        else:
            # Логируем, что сообщение уже было выведено
            print(f"[INFO] Message with ID {message.id} already printed, skipping.")


In [None]:
from langchain_core.runnables import Runnable, RunnableConfig
# from langchain_core.pydantic_v1 import BaseModel, Field
from pydantic import BaseModel, Field


class Assistant:
    """
    A class to manage interactions with a runnable agent and ensure valid responses.

    Attributes:
        runnable (Runnable): An instance of the Runnable class used to invoke actions and obtain results.

    Methods:
        __call__(state: State, config: RunnableConfig) -> dict:
            Executes the runnable with the provided state and configuration, and handles invalid responses by updating
            the state with appropriate messages until a valid response is obtained.

    Args:
        runnable (Runnable): The runnable instance that performs the actual work and provides the result.
    """

    def __init__(self, runnable: Runnable):
        """
        Initializes the Assistant with a runnable instance.

        Args:
            runnable (Runnable): An instance of the Runnable class to be used for invoking actions.
        """
        self.runnable = runnable

    def __call__(self, state: State, config: RunnableConfig):
        """
        Executes the runnable with the given state and configuration, and ensures the response is valid.

        The method continuously invokes the runnable until a valid response is obtained. If the response is invalid (e.g., 
        no tool calls and empty or invalid content), it updates the state with a message prompting for a real output.

        Args:
            state (State): The current state of the agent, including messages and other relevant information.
            config (RunnableConfig): Configuration settings for the runnable.

        Returns:
            dict: A dictionary containing the updated messages and result from the runnable invocation.

        Example:
            result = self(state, config)
        """
        while True:
            result = self.runnable.invoke(state)

            if not result.tool_calls and (
                not result.content
                or isinstance(result.content, list)
                and not result.content[0].get("text")
            ):
                messages = state["messages"] + \
                    [("user", "Respond with a real output.")]
                state = {**state, "messages": messages}
                messages = state["messages"] + \
                    [("user", "Respond with a real output.")]
                state = {**state, "messages": messages}
            else:
                break
        return {"messages": result}
    
class ToPCBuildAssistant(BaseModel):
    """Transfers work to a specialized assistant to handle PC building requests and answer questions about components."""
    user_input: Optional[str] = Field(default=None, description="A question about specific components or compatibility or about PC build.")

    class Config:
        schema_extra = {
            "example": 'Собери мне компьютер за 200000 с поддержкой 4к'
        }

    
    
# class ToPriceValidationCheckerAssistant(BaseModel):
#     """
#     Transfers work to a specialized assistant to check prices of some components 
#     or to check if the selected components (CPU, GPU, RAM) can run a specific game.
    
#     Входные данные:
#       - input_data: словарь с ключами "game_name", "cpu", "gpu", "ram" и опционально "memory"
      
#     """
#     input_data: Dict[str, Any] = Field(
#         ...,
#         description=(
#             "Словарь, содержащий параметры для проверки совместимости: "
#             "game_name, cpu, gpu, ram, и опционально memory. "
#             "Пример: { 'game_name': 'Cyberpunk 2077', 'cpu': 'Intel Core i7-12700', "
#             "'gpu': 'RTX 3070', 'ram': 16, 'memory': 16 }"
#         )
#     )
   
#     class Config:
#         schema_extra = {
#             "example": {
#                 "input_data": {
#                     "game_name": "Cyberpunk 2077",
#                     "cpu": "Intel Core i7-12700",
#                     "gpu": "RTX 3070",
#                     "ram": 16,
#                     "memory": 16
#                 },
               
#             }
#         }


from pydantic import BaseModel, Field
from typing import Optional, List, Union


# Вариант 1: Входные данные для game_run_tool
class GameRunInput(BaseModel):
    game_name: str = Field(description="Название игры, которую нужно проверить.")
    cpu: str = Field(description="Процессор пользователя.")
    gpu: str = Field(description="Видеокарта пользователя.")
    ram: int = Field(description="Объем оперативной памяти в гигабайтах.")


# Вариант 2: Входные данные для calculate_bottle_neck
class BottleNeckInput(BaseModel):
    cpu: str = Field(description="Процессор для проверки узкого горлышка.")
    gpu: str = Field(description="Видеокарта для проверки узкого горлышка.")
    resolution: str = Field(description="Разрешение экрана (например, '1440p').")
    
    

class CPU(BaseModel):
    cpu: str = Field(..., description="Модель процессора", example="Intel Core i7-12700")

class GPU(BaseModel):
    gpu: str = Field(..., description="Модель видеокарты", example="GeForce RTX 3070")

class Memory(BaseModel):
    name: str = Field(..., description="Название памяти", example="Corsair Vengeance 16 GB")

class Corpus(BaseModel):
    name: str = Field(..., description="Модель корпуса", example="Cooler Master MasterBox")

class PowerSupply(BaseModel):
    name: str = Field(..., description="Модель блока питания", example="Be Quiet! Pure Power 11 600W")

class Motherboard(BaseModel):
    name: str = Field(..., description="Модель материнской платы", example="ASUS ROG Strix Z690-F")

# Объединение всех возможных компонентов
Component = Union[CPU, GPU, Memory, Corpus, PowerSupply, Motherboard]

# Модель для входных данных
class RegardInput(BaseModel):
    components: Optional[List[Component]] = Field(
        None,
        description="Список компонентов для анализа. Может быть пустым или содержать только часть компонентов.",
        example=[
            {"cpu": "Intel Core i7-12700"},
            {"gpu": "GeForce RTX 3070"},
            {"name": "Corsair Vengeance 16 GB"},
            {"name": "Cooler Master MasterBox"},
            {"name": "Be Quiet! Pure Power 11 600W"},
            {"name": "ASUS ROG Strix Z690-F"}
        ]
    )





# Основной класс с учетом всех вариантов
class ToPriceValidationCheckerAssistant(BaseModel):
    input_data: Union[GameRunInput, BottleNeckInput, RegardInput] = Field(
        ..., description="Входные данные для проверки совместимости или поиска компонентов."
    )

    class Config:
        json_schema_extra = {
            "examples": [
                {
                    "description": "Проверка запуска игры (game_run_tool)",
                    "input_data": {
                        "game_name": "Cyberpunk 2077",
                        "cpu": "Intel Core i7-12700",
                        "gpu": "RTX 3070",
                        "ram": 16
                    }
                },
               
            ]
        }


In [104]:
def leave_skill(state: State) -> dict:
    """Завершение работы с подассистентом и возвращение в основной ассистент"""
    messages = []
    if state["messages"][-1].tool_calls:
        messages.append(
            ToolMessage(
                content="Завершаем текущую задачу и возвращаемся к основному ассистенту.",
                tool_call_id=state["messages"][-1].tool_calls[0]["id"],
            )
        )
    return {"dialog_state": "pop", "messages": messages}

In [105]:
class AgenticGraph:
    def __init__(self) -> None:
        self.builder = StateGraph(State)
        self.builder.add_node("fetch_user_info", self.fetch_user_info)
        self.builder.add_edge(START, "fetch_user_info")

    def fetch_user_info(self, state: State):
        """
        Сбор информации о пользователе. Это определяет, какие данные нужно собирать в контексте запроса.
        Например, информация о запросах на сборку ПК или проверку совместимости.
        """
        user_query = state["messages"][-1].content.lower()  # Получаем последний запрос пользователя
        
        return {"info": user_query}
    
    

    # ===========================
    # PC Build Assistant
    # ===========================
    def add_pc_build_nodes_to_graph(self):
        self.builder.add_node(
            "enter_build_pc",
            create_entry_node("PC Build Assistant", "build_pc"),
        )
        self.builder.add_node("build_pc", Assistant(pc_build_runnable))
        self.builder.add_edge("enter_build_pc", "build_pc")
        self.builder.add_node(
            "build_pc_tools",
            create_tool_node_with_fallback(pc_build_tools),
        )

        def route_build_pc(state: State) -> Literal["build_pc_tools", "leave_skill", "__end__"]:
            
            route = tools_condition(state)
            if route == END:
                return END
            tool_calls = state["messages"][-1].tool_calls
            did_cancel = any(
                tc["name"] == CompleteOrEscalate.__name__ for tc in tool_calls
            )
            if did_cancel:
                return "leave_skill"
            return "build_pc_tools"
        
        self.builder.add_edge("build_pc_tools", "build_pc")
        self.builder.add_conditional_edges("build_pc", route_build_pc)

        

        self.builder.add_node("leave_skill", leave_skill)
        self.builder.add_edge("leave_skill", "primary_assistant")

    # ===========================
    # Price Validation Checker
    # ===========================
    def add_price_validation_nodes_to_graph(self):
        
        self.builder.add_node(
            "enter_validate_price",
            create_entry_node("Price Validation Assistant", "validate_price"),
        )
        self.builder.add_node("validate_price", Assistant(price_validation_checker_runnable))
        self.builder.add_edge("enter_validate_price", "validate_price")
        self.builder.add_node(
            "price_validation_tools",
            create_tool_node_with_fallback(price_validation_checker_tools),
        )
        
   
        def route_validate_price(
            state: State) -> Literal[
            "price_validation_tools", "leave_skill", "__end__"]:
            route = tools_condition(state)
            if route == END:
                return END
            tool_calls = state["messages"][-1].tool_calls
            did_cancel = any(
                tc["name"] == CompleteOrEscalate.__name__ for tc in tool_calls)
            if did_cancel:
                return "leave_skill"
            return "price_validation_tools"
        
        self.builder.add_edge("price_validation_tools", "validate_price")
        self.builder.add_conditional_edges("validate_price", route_validate_price)

        if "leave_skill" not in self.builder.nodes:
            self.builder.add_node("leave_skill", leave_skill)
        self.builder.add_edge("leave_skill", "primary_assistant")

    # ===========================
    # Primary Assistant
    # ===========================
    def add_primary_assistant_nodes_to_graph(self):
        self.builder.add_node("primary_assistant", Assistant(primary_assistant_runnable))
        self.builder.add_node(
            "primary_assistant_tools", create_tool_node_with_fallback(primary_assistant_tools)
        )

        def route_primary_assistant(state: State) -> Literal["primary_assistant_tools", "enter_build_pc", "enter_validate_price", "__end__"]:
            print(f"Состояние перед маршрутом: {state}")
            
            # Check the route condition first (you can use any existing condition function here)
            route = tools_condition(state)
            if route == END:
                return END
            
            tool_calls = state["messages"][-1].tool_calls
            
            if tool_calls:
                tool_name = tool_calls[0]["name"]
                print(f"Используется инструмент: {tool_name}")
                if tool_calls[0]["name"] == ToPCBuildAssistant.__name__:
                    print("Переход на маршрут 'enter_build_pc'")
                    return "enter_build_pc"
                
                elif tool_calls[0]["name"] == ToPriceValidationCheckerAssistant.__name__:
                    print("Переход на маршрут 'enter_validate_price'")
                    return "enter_validate_price"
                else:
                    print(f"Ошибка: инструмент {tool_name} не поддерживается.")
                    return "primary_assistant_tools"
            raise ValueError("Invalid route")
            
                
            
        self.builder.add_conditional_edges(
            "primary_assistant",
            route_primary_assistant,
            {
                "enter_build_pc": "enter_build_pc",
                "enter_validate_price": "enter_validate_price",
                "primary_assistant_tools": "primary_assistant_tools",
                END: END,
            },
        )
        self.builder.add_edge("primary_assistant_tools", "primary_assistant")

        def route_to_workflow(state: State) -> Literal[
            "primary_assistant", "build_pc", "validate_price"
        ]:
            dialog_state = state.get("dialog_state")
            if not dialog_state:
                return "primary_assistant"
            return dialog_state[-1]

        self.builder.add_conditional_edges("fetch_user_info", route_to_workflow)

    def Compile_graph(self):
        self.add_pc_build_nodes_to_graph()
        self.add_price_validation_nodes_to_graph()
        self.add_primary_assistant_nodes_to_graph()

        # Compile graph
        memory = MemorySaver()
        graph = self.builder.compile(checkpointer=memory)
        return graph


С логированием

In [106]:
# Создаем экземпляр графа
agentic_graph = AgenticGraph()

# Компилируем граф, добавляя все необходимые узлы и связи
compiled_graph = agentic_graph.Compile_graph()


In [None]:
from IPython.display import Image, display

try:
    display(Image(compiled_graph.get_graph(xray=True).draw_mermaid_png()))
except Exception:
    # This requires some extra dependencies and is optional
    pass

In [107]:
import uuid
thread_id = str(uuid.uuid4())

config = {
    "configurable": {
        # The passenger_id is used in our flight tools to
        # fetch the user's flight information
        
        # Checkpoints are accessed by thread_id
        "thread_id": thread_id,
    }
}

In [None]:
sample_questions = [
    "Можешь найти актуальные цены на видеокарту RTX 4070",
    # "Подскажи, какие компоненты стоит использовать для сбалансированной сборки?",
    # "Какие основные характеристики нужно учитывать для выбора комплектующих?",
    # "Какой процессор будет оптимальным для современных игр?",
    # "Насколько выбранный процессор раскрывает потенциал видеокарты при стандартном разрешении?",
    # "Какой компонент может стать узким местом в данной конфигурации?",
    # "Пойдёт ли Cyberpunk 2077 игра на подобной системе?"
]


_printed = set()
# We can reuse the tutorial questions from part 1 to see how it does.
for question in sample_questions:
    events = compiled_graph.stream(
        {"messages": ("user", question)}, config, stream_mode="values"
    )
    for event in events:
        _print_event(event, _printed)
    snapshot = compiled_graph.get_state(config)
    while snapshot.next:
        # We have an interrupt! The agent is trying to use a tool, and the user can approve or deny it
        # Note: This code is all outside of your graph. Typically, you would stream the output to a UI.
        # Then, you would have the frontend trigger a new run via an API call when the user has provided input.
        
        # Satisfy the tool invocation by
        # providing instructions on the requested changes / change of mind
        result = compiled_graph.invoke(
            {
                "messages": [
                    ToolMessage(
                        tool_call_id=event["messages"][-1].tool_calls[0]["id"],
                        content=f"API call denied by user. Reasoning: '{user_input}'. Continue assisting, accounting for the user's input.",
                    )
                ]
            },
            config,
            )
        snapshot = compiled_graph.get_state(config)