In [1]:
import json
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, END, MessagesState
from langgraph.prebuilt import ToolNode
from langchain_core.messages import HumanMessage, SystemMessage
from langchain_community.tools import TavilySearchResults
#from langchain.agents import load_tools
from langchain_core.tools import tool
import os
from langgraph.graph import StateGraph, MessagesState

import pandas as pd
from time import sleep

# Устанавливаем API-ключи
os.environ["TAVILY_API_KEY"] = ""
os.environ["VSEGPT_API_KEY"] = ""

# Tavily для поиска в интернете
search_tool = TavilySearchResults(max_results=3, include_answer=True, include_raw_content=False)

# Human tool для взаимодействия с пользователем
#human_tool = load_tools(["human"])[0]



# Список инструментов в корректном формате
tools = [search_tool]

# Создаем LLM-модель для работы с VseGPT API
llm = ChatOpenAI(
    model="openai/gpt-4o-mini",  # Используем нужную модель VseGPT
    temperature=0.7,
    max_tokens=None,
    openai_api_base="https://api.vsegpt.ru/v1",  # Указываем кастомный URL API
    openai_api_key=os.getenv("VSEGPT_API_KEY"),# Берем API-ключ из переменной окружения
    ).bind_tools(tools)  # <-- Теперь инструменты передаются корректно

print("Модель VseGPT успешно инициализирована!")

  search_tool = TavilySearchResults(max_results=3, include_answer=True, include_raw_content=False)


Модель VseGPT успешно инициализирована!


Фабрика LLM

In [2]:
def make_llm(model: str, temperature: float, max_tokens: int | None, tools: list):
    return ChatOpenAI(
        model=model,
        temperature=temperature,
        max_tokens=max_tokens,
        openai_api_base="https://api.vsegpt.ru/v1",
        openai_api_key=os.getenv("VSEGPT_API_KEY"),
    ).bind_tools(tools)

## Состояние графа

In [3]:
from typing import TypedDict, List, Dict, Any, Literal, Optional


# Все роли, которые теперь поддерживаются
Role = Literal[
    "search",   # Retrieval
    "document",  # Document / Media processing (pdf, images, video)
    "extract",  # Extraction / Parsing
    "reason",   # Reasoning
    "calc",     # Calculation / Code
    "critic",   # Verification / Critic
    "answer"    # Summarization / Final answer
]


Status = Literal["pending", "in_progress", "done"]


class Task(TypedDict):
    id: str
    role: Role
    description: str
    depends_on: List[str]
    priority: int
    status: Status
    # result теперь может быть строкой, dict (JSON), списком и т.д.
    result: Optional[Any]


class ToolsPolicy(TypedDict, total=False):
    allow_search: bool
    allow_human: bool
    allow_python: bool  # полезно для calc-агента (PythonREPLTool)


class ResourceConfig(TypedDict, total=False):
    model: str
    max_tokens: int
    temperature: float
    tools_policy: ToolsPolicy


class FileRef(TypedDict, total=False):
    id: str
    name: str
    path: str
    source: str
    mime: str
    original_path: str

class GraphState(TypedDict, total=False):
    query: str
    tasks: list
    current_task_id: Optional[str]
    resources: dict
    intermediate_results: dict
    final_answer: Optional[str]

    # GAIA additions
    gaia_task_id: Optional[str]
    files: List[FileRef]
    data_dir: Optional[str]
    ground_truth: Optional[str]


## Ноды графа

Нам нужны:

- planner_node – планировщик

- oracle_node – оракул (ресурсы)

- select_task_node – выбирает следующую задачу

- route_by_role – condition / router по типу задачи

- search_agent_node

- critic_agent_node

- answer_agent_node

- finish_node – собирает финальный ответ (если нужно отдельно)

## Планировщик (PlannerNode)

In [4]:
import json
from typing import List, Dict, Any
from langchain_core.messages import SystemMessage, HumanMessage

# Допустимые роли (должны совпадать с твоим Role Literal)
ALLOWED_ROLES = {"search", "document", "extract", "reason", "calc", "critic", "answer"}


def _ensure_document_task(tasks: List["Task"], state: "GraphState") -> List["Task"]:
    """
    Гарантирует наличие document-задачи, если во входе есть файлы.
    Вставляет её после search (если search есть), иначе в начало.
    Также гарантирует, что extract (и при желании остальные) зависят от document.
    """
    has_files = bool(state.get("files"))
    if not has_files:
        return tasks  # нечего добавлять

    has_document = any(t.get("role") == "document" for t in tasks)
    if has_document:
        return tasks

    # Определяем позицию вставки: после search, если он первый; иначе в начало
    insert_at = 0
    if tasks and tasks[0].get("role") == "search":
        insert_at = 1

    doc_id = "TDOC"  # фиксированный ID, чтобы не перенумеровывать T1/T2/...

    depends_on = []
    if insert_at == 1:
        depends_on = [tasks[0]["id"]]

    doc_task: Task = {
        "id": doc_id,
        "role": "document",
        "description": (
            "Обработать входные файлы/медиа из state.files (pdf/изображения/видео): "
            "извлечь текст/описания/таблицы/страницы и сохранить в промежуточные результаты "
            "(например, state.intermediate_results[TDOC] или state.doc_cache)."
        ),
        "depends_on": depends_on,
        "priority": 1,
        "status": "pending",
        "result": None,
    }

    tasks.insert(insert_at, doc_task)

    # ВАЖНО: заставим extract зависеть от TDOC.
    # (Если хочешь жёстче — можно добавить зависимость и для reason/critic/answer, но обычно достаточно extract.)
    for t in tasks:
        if t.get("role") == "extract":
            deps = t.get("depends_on", [])
            if doc_id not in deps and t["id"] != doc_id:
                t["depends_on"] = deps + [doc_id]

    return tasks


def planner_node(state: "GraphState") -> "GraphState":
    query = state["query"]

    system_prompt = (
        "Ты — агент-планировщик мультиагентной системы.\n"
        "Разбей пользовательский запрос на последовательность подзадач.\n\n"
        "Доступные роли:\n"
        "- search: найти источники/факты в интернете (retrieval)\n"
        "- document: обработать локальные документы/медиа (pdf/изображения/видео), получить текст/описания/структуру\n"
        "- extract: извлечь структуру из найденного/прочитанного (термины, определения, числа, требования)\n"
        "- reason: построить логический вывод/план решения на основе извлечённого\n"
        "- calc: выполнить вычисления/код (когда нужны точные расчёты)\n"
        "- critic: проверить факты/логические ошибки/неподтверждённые утверждения\n"
        "- answer: написать финальный ответ пользователю\n\n"
        "Правила:\n"
        "1) Используй ТОЛЬКО роли из списка.\n"
        "2) Каждая задача должна быть атомарной и формулироваться как действие.\n"
        "3) Задай depends_on (зависимости) так, чтобы соблюдался порядок.\n"
        "4) Если нужны вычисления/точность — добавь задачу role=calc.\n"
        "5) Обычно хороший пайплайн: search -> (document?) -> extract -> reason -> (calc?) -> critic -> answer.\n"
        "6) Если есть входные файлы/медиа, добавь задачу role=document перед extract.\n"
        "7) Последняя задача ВСЕГДА role=answer.\n"
        "8) Верни СТРОГО JSON без пояснений.\n"
    )

    user_prompt = (
        f"Пользовательский запрос:\n{query}\n\n"
        "Сформируй план в JSON.\n"
        "Формат:\n"
        "{\n"
        '  "tasks": [\n'
        "    {\n"
        '      "id": "T1",\n'
        '      "role": "search|document|extract|reason|calc|critic|answer",\n'
        '      "description": "строка",\n'
        '      "depends_on": ["T0", "..."],\n'
        '      "priority": 1\n'
        "    }\n"
        "  ]\n"
        "}\n"
    )

    llm = make_llm(
        model="openai/gpt-4o-mini",
        temperature=0.2,
        max_tokens=800,
        tools=[],
    )

    response = llm.invoke([
        SystemMessage(content=system_prompt),
        HumanMessage(content=user_prompt),
    ])

    # --- Парсим JSON ---
    try:
        plan = json.loads(response.content)
        raw_tasks = plan["tasks"]
        if not isinstance(raw_tasks, list):
            raise ValueError("tasks is not a list")
    except Exception as e:
        raise ValueError(f"Planner вернул некорректный JSON:\n{response.content}") from e

    # --- Нормализация + валидация ---
    tasks: List[Task] = []
    for t in raw_tasks:
        role = t.get("role")
        if role not in ALLOWED_ROLES:
            raise ValueError(
                f"Planner вернул недопустимую роль: {role}. "
                f"Допустимые: {sorted(ALLOWED_ROLES)}"
            )

        tasks.append({
            "id": t["id"],
            "role": role,
            "description": t["description"],
            "depends_on": t.get("depends_on", []),
            "priority": int(t.get("priority", 1)),
            "status": "pending",
            "result": None,
        })

    # --- ВОТ СЮДА: вставка document-задачи, чтобы LLM не забывал ---
    tasks = _ensure_document_task(tasks, state)

    # --- Гарантируем, что последняя задача answer ---
    if tasks and tasks[-1]["role"] != "answer":
        last_id = tasks[-1]["id"]
        new_id = f"T{len(tasks) + 1}"
        tasks.append({
            "id": new_id,
            "role": "answer",
            "description": "Сформировать финальный ответ пользователю, используя результаты всех задач.",
            "depends_on": [last_id],
            "priority": tasks[-1]["priority"] + 1 if tasks else 1,
            "status": "pending",
            "result": None,
        })

    state["tasks"] = tasks
    return state


In [13]:
import json

state = {
    "query": "Объясни, как работает мультиагентная система для решения бенчмарка GAIA"
}

state = planner_node(state)

print("=== TASKS FROM PLANNER ===")
print(json.dumps(state["tasks"], ensure_ascii=False, indent=2))


=== TASKS FROM PLANNER ===
[
  {
    "id": "T1",
    "role": "search",
    "description": "Найти информацию о мультиагентных системах и их применении для решения бенчмарка GAIA.",
    "depends_on": [],
    "priority": 1,
    "status": "pending",
    "result": null
  },
  {
    "id": "T2",
    "role": "extract",
    "description": "Извлечь ключевые термины и определения из найденной информации о мультиагентных системах и GAIA.",
    "depends_on": [
      "T1"
    ],
    "priority": 2,
    "status": "pending",
    "result": null
  },
  {
    "id": "T3",
    "role": "reason",
    "description": "Построить логический вывод о том, как мультиагентные системы работают для решения бенчмарка GAIA на основе извлеченной информации.",
    "depends_on": [
      "T2"
    ],
    "priority": 3,
    "status": "pending",
    "result": null
  },
  {
    "id": "T4",
    "role": "critic",
    "description": "Проверить факты и логические выводы о мультиагентных системах и GAIA на наличие ошибок.",
    "depe

## Оракул (OracleNode)

In [5]:
import json
from typing import Dict, Any, List
from langchain_core.messages import SystemMessage, HumanMessage


def oracle_node(state: "GraphState") -> "GraphState":
    tasks: List["Task"] = state["tasks"]

    # === Правила и подсказки для LLM-Оракула ===
    oracle_system_prompt = (
        "Ты — Оракул ресурсов в мультиагентной системе.\n"
        "Твоя задача: распределить вычислительные ресурсы между подзадачами.\n\n"
        "Ресурсы включают:\n"
        "- model (строка)\n"
        "- max_tokens (целое)\n"
        "- temperature (0..1)\n"
        "- tools_policy (разрешения на инструменты)\n\n"
        "Доступные роли задач:\n"
        "- search: web-поиск и сбор фактов\n"
        "- document: обработка локальных файлов/медиа (pdf/изображения/видео), извлечение текста/описаний/структуры\n"
        "- extract: извлечение структуры/сущностей/определений из найденного/прочитанного\n"
        "- reason: рассуждение/план/вывод на основе extracted данных\n"
        "- calc: вычисления/код (Python)\n"
        "- critic: проверка фактов/логики/несостыковок\n"
        "- answer: финальный ответ пользователю\n\n"
        "Рекомендации по настройкам:\n"
        "- search: allow_search=True, температура умеренная (0.2–0.4)\n"
        "- document: температура низкая (0.0–0.2). allow_search обычно False.\n"
        "           Если нужно рендерить/обрабатывать много страниц — max_tokens выше (900–1400).\n"
        "- extract: температура низкая (0.0–0.2), allow_search обычно False\n"
        "- reason: температура 0.1–0.4, инструменты обычно не нужны\n"
        "- calc: allow_python=True, температура низкая (0.0–0.2)\n"
        "- critic: температура низкая (0.0–0.2)\n"
        "- answer: max_tokens обычно выше, чем у остальных\n\n"
        "Правила:\n"
        "1) Уважай глобальный бюджет total_token_budget.\n"
        "2) max_tokens для каждой задачи >= 200.\n"
        "3) Верни СТРОГО JSON без пояснений.\n"
    )

    total_token_budget = int(state.get("total_token_budget", 4000))

    # Каталог допустимых моделей (можешь расширить)
    allowed_models = [
        "openai/gpt-4o-mini",
        # "openai/gpt-4o",
        # "openai/gpt-4.1-mini",
        # "openai/gpt-4.1",
    ]

    oracle_user_prompt = {
        "total_token_budget": total_token_budget,
        "allowed_models": allowed_models,
        # чуть больше контекста для document: наличие файлов и их mime
        "context": {
            "has_files": bool(state.get("files")),
            "files": [
                {
                    "id": f.get("id"),
                    "name": f.get("name"),
                    "mime": f.get("mime"),
                    "path": f.get("path"),
                }
                for f in (state.get("files") or [])
            ][:10],  # не раздуваем промпт
        },
        "tasks": [
            {
                "id": t["id"],
                "role": t["role"],
                "description": t["description"],
                "depends_on": t.get("depends_on", []),
                "priority": t.get("priority", 1),
            }
            for t in tasks
        ],
        "required_output_schema": {
            "global": {"total_token_budget": "int"},
            "allocations": [
                {
                    "task_id": "str",
                    "model": "str",
                    "max_tokens": "int",
                    "temperature": "float",
                    "tools_policy": {
                        "allow_search": "bool",
                        "allow_human": "bool",
                        "allow_python": "bool"
                    }
                }
            ]
        }
    }

    oracle_llm = make_llm(
        model="openai/gpt-4o-mini",
        temperature=0.0,
        max_tokens=900,
        tools=[],
    )

    resp = oracle_llm.invoke([
        SystemMessage(content=oracle_system_prompt),
        HumanMessage(content=json.dumps(oracle_user_prompt, ensure_ascii=False)),
    ])

    # === Парсинг ===
    try:
        oracle_out = json.loads(resp.content)
    except Exception as e:
        raise ValueError(f"Oracle вернул невалидный JSON:\n{resp.content}") from e

    allocations = oracle_out.get("allocations", [])
    if not allocations or not isinstance(allocations, list):
        raise ValueError(f"Oracle: нет allocations или неверный формат:\n{resp.content}")

    resources: Dict[str, Any] = {}
    seen = set()

    # === Нормализация ответа оракула ===
    for a in allocations:
        task_id = a.get("task_id")
        if not task_id or task_id in seen:
            continue
        seen.add(task_id)

        model = a.get("model")
        if model not in allowed_models:
            model = "openai/gpt-4o-mini"

        try:
            max_tokens = int(a.get("max_tokens", 400))
        except Exception:
            max_tokens = 400
        max_tokens = max(200, max_tokens)

        try:
            temperature = float(a.get("temperature", 0.2))
        except Exception:
            temperature = 0.2
        temperature = max(0.0, min(1.0, temperature))

        tp = a.get("tools_policy", {}) or {}
        resources[task_id] = {
            "model": model,
            "max_tokens": max_tokens,
            "temperature": temperature,
            "tools_policy": {
                "allow_search": bool(tp.get("allow_search", False)),
                "allow_human": bool(tp.get("allow_human", False)),
                "allow_python": bool(tp.get("allow_python", False)),
            }
        }

    # === Fallback: если оракул "забыл" задачи ===
    missing = [t["id"] for t in tasks if t["id"] not in resources]
    if missing:
        for tid in missing:
            role = next(t["role"] for t in tasks if t["id"] == tid)

            if role == "search":
                resources[tid] = {
                    "model": "openai/gpt-4o-mini",
                    "max_tokens": 900,
                    "temperature": 0.3,
                    "tools_policy": {"allow_search": True, "allow_human": False, "allow_python": False},
                }
            elif role == "document":
                # Если файлов много/тяжёлые — document лучше дать побольше токенов
                files_count = len(state.get("files") or [])
                doc_tokens = 1100 if files_count >= 2 else 900
                resources[tid] = {
                    "model": "openai/gpt-4o-mini",
                    "max_tokens": doc_tokens,
                    "temperature": 0.1,
                    "tools_policy": {"allow_search": False, "allow_human": False, "allow_python": False},
                }
            elif role == "extract":
                resources[tid] = {
                    "model": "openai/gpt-4o-mini",
                    "max_tokens": 900,
                    "temperature": 0.1,
                    "tools_policy": {"allow_search": False, "allow_human": False, "allow_python": False},
                }
            elif role == "reason":
                resources[tid] = {
                    "model": "openai/gpt-4o-mini",
                    "max_tokens": 1100,
                    "temperature": 0.2,
                    "tools_policy": {"allow_search": False, "allow_human": False, "allow_python": False},
                }
            elif role == "calc":
                resources[tid] = {
                    "model": "openai/gpt-4o-mini",
                    "max_tokens": 900,
                    "temperature": 0.0,
                    "tools_policy": {"allow_search": False, "allow_human": False, "allow_python": True},
                }
            elif role == "critic":
                resources[tid] = {
                    "model": "openai/gpt-4o-mini",
                    "max_tokens": 900,
                    "temperature": 0.1,
                    "tools_policy": {"allow_search": False, "allow_human": False, "allow_python": False},
                }
            else:  # answer
                resources[tid] = {
                    "model": "openai/gpt-4o-mini",
                    "max_tokens": 1400,
                    "temperature": 0.2,
                    "tools_policy": {"allow_search": False, "allow_human": False, "allow_python": False},
                }

    # === Если document-задача есть, а state.files нет — можно урезать ей токены (страховка) ===
    has_files = bool(state.get("files"))
    for tid, cfg in list(resources.items()):
        role = next((t["role"] for t in tasks if t["id"] == tid), None)
        if role == "document" and not has_files:
            cfg["max_tokens"] = min(cfg["max_tokens"], 400)

    # === (Опционально) соблюдаем общий бюджет токенов ===
    total_assigned = sum(int(r.get("max_tokens", 0)) for r in resources.values())
    if total_assigned > total_token_budget and total_assigned > 0:
        k = total_token_budget / total_assigned
        for tid in resources:
            resources[tid]["max_tokens"] = max(200, int(resources[tid]["max_tokens"] * k))

    state["resources"] = resources
    return state


In [15]:
import json

state = {
    "query": "Объясни, как работает мультиагентная система для решения бенчмарка GAIA"
}

# Planner
state = planner_node(state)

print("=== TASKS ===")
print(json.dumps(state["tasks"], ensure_ascii=False, indent=2))

=== TASKS ===
[
  {
    "id": "T1",
    "role": "search",
    "description": "найти источники о мультиагентных системах и бенчмарке GAIA",
    "depends_on": [],
    "priority": 1,
    "status": "pending",
    "result": null
  },
  {
    "id": "T2",
    "role": "extract",
    "description": "извлечь ключевые термины и определения из найденных источников",
    "depends_on": [
      "T1"
    ],
    "priority": 2,
    "status": "pending",
    "result": null
  },
  {
    "id": "T3",
    "role": "reason",
    "description": "построить логический вывод о том, как работает мультиагентная система для решения бенчмарка GAIA на основе извлеченных данных",
    "depends_on": [
      "T2"
    ],
    "priority": 3,
    "status": "pending",
    "result": null
  },
  {
    "id": "T4",
    "role": "answer",
    "description": "написать финальный ответ пользователю о работе мультиагентной системы для решения бенчмарка GAIA",
    "depends_on": [
      "T3"
    ],
    "priority": 4,
    "status": "pending"

In [6]:
# Oracle (LLM)
state = oracle_node(state)

print("\n=== ORACLE RESOURCES ===")
print(json.dumps(state["resources"], ensure_ascii=False, indent=2))


NameError: name 'state' is not defined

## select_task_node
Нода выбора следующей задачи

In [6]:
def select_task_node(state: GraphState) -> GraphState:
    tasks = state["tasks"]

    for t in tasks:
        if t["status"] != "pending":
            continue

        deps = t.get("depends_on", [])
        if not deps:
            # нет зависимостей — можно брать
            t["status"] = "in_progress"
            state["current_task_id"] = t["id"]
            return state

        # проверяем, что все зависимости выполнены
        all_done = True
        for dep_id in deps:
            dep_task = next(
                (x for x in tasks if x["id"] == dep_id),
                None
            )
            if dep_task is None or dep_task["status"] != "done":
                all_done = False
                break

        if all_done:
            t["status"] = "in_progress"
            state["current_task_id"] = t["id"]
            return state

    # если нет доступных задач
    state["current_task_id"] = None
    return state


## Router по роли задачи (conditional edges)

Функция, которая решает, в какую ноду идти дальше.

In [7]:
from langgraph.graph import END


def route_by_role(state: "GraphState") -> str:
    """
    Роутер LangGraph.
    По текущей задаче определяет, какой агент должен быть вызван.
    Поддерживает роль `document` для document_media_agent_node.
    """

    current_id = state.get("current_task_id")
    if current_id is None:
        return END

    # безопасно находим текущую задачу
    tasks = state.get("tasks", [])
    task = next((t for t in tasks if t.get("id") == current_id), None)
    if task is None:
        return END

    role = task.get("role")

    if role == "search":
        return "search_agent"

    elif role == "document":
        return "document_media_agent"

    elif role == "extract":
        return "extraction_agent"

    elif role == "reason":
        return "reasoning_agent"

    elif role == "calc":
        return "calculation_agent"

    elif role == "critic":
        return "critic_agent"

    elif role == "answer":
        return "answer_agent"

    # неизвестная роль → завершаем, но не падаем
    return END


# Агенты исполнители

In [8]:
# !pip install langchain-experimental

In [9]:
from langchain_community.tools import TavilySearchResults
from langchain_experimental.tools import PythonREPLTool  # pip install langchain-experimental

search_tool = TavilySearchResults(max_results=5, include_answer=False, include_raw_content=False)
python_tool = PythonREPLTool()

## SearchAgent

In [10]:
import json
from typing import Any, Dict, List
from langchain_core.messages import SystemMessage, HumanMessage


def _safe_get_task(state: GraphState, task_id: str) -> Dict[str, Any]:
    task = next((t for t in state["tasks"] if t["id"] == task_id), None)
    if task is None:
        raise KeyError(f"Task {task_id} not found in state['tasks']")
    return task


def search_agent_node(state: GraphState) -> GraphState:
    task_id = state["current_task_id"]
    if task_id is None:
        return state

    task = _safe_get_task(state, task_id)
    cfg = state["resources"].get(task_id, {})
    tools_policy = cfg.get("tools_policy", {"allow_search": True, "allow_human": False})

    # Гарантируем, что intermediate_results существует
    state.setdefault("intermediate_results", {})

    # === 1) Если поиск запрещён Оракулом — возвращаем "cannot_search" ===
    if not tools_policy.get("allow_search", True):
        result = {
            "status": "skipped",
            "reason": "Search tool is disabled by oracle tools_policy",
            "facts": [],
            "sources": [],
            "summary": ""
        }
        task["result"] = result
        task["status"] = "done"
        state["intermediate_results"][task_id] = result
        return state

    # === 2) Сходить в Tavily ===
    # TavilySearchResults обычно принимает {"query": "..."} и возвращает список результатов
    query = task["description"]

    try:
        tavily_results = search_tool.invoke({"query": query})
        # tavily_results обычно list[dict] с полями типа: title, url, content/answer/snippet
        if not isinstance(tavily_results, list):
            tavily_results = [tavily_results]
    except Exception as e:
        result = {
            "status": "error",
            "reason": f"Tavily search failed: {e}",
            "facts": [],
            "sources": [],
            "summary": ""
        }
        task["result"] = result
        task["status"] = "done"
        state["intermediate_results"][task_id] = result
        return state

    # Нормализуем источники (оставим title + url + краткий фрагмент)
    sources: List[Dict[str, str]] = []
    for r in tavily_results[:5]:
        title = (r.get("title") or "").strip()
        url = (r.get("url") or "").strip()
        snippet = (r.get("content") or r.get("answer") or "").strip()
        sources.append({"title": title, "url": url, "snippet": snippet[:600]})

    # === 3) LLM-синтез: извлечь факты + короткое резюме ===
    # Для search-агента LLM можно брать mini (как решит Оракул)
    llm = make_llm(
        model=cfg.get("model", "openai/gpt-4o-mini"),
        temperature=cfg.get("temperature", 0.2),
        max_tokens=cfg.get("max_tokens", 900),
        tools=[],  # здесь tools не нужны, поиск уже сделан
    )

    system_prompt = (
        "You are a search assistant. "
        "Your job is to extract verifiable facts from web search snippets and provide a short synthesis.\n"
        "Rules:\n"
        "1) Use only the provided sources.\n"
        "2) Output STRICT JSON.\n"
        "3) Provide 3-8 facts max, each fact should be short and checkable.\n"
        "4) Each fact must include a source_url field referring to one of the sources.\n"
    )

    user_payload = {
        "task_description": task["description"],
        "sources": sources,
        "required_output_schema": {
            "facts": [{"fact": "str", "source_url": "str"}],
            "summary": "str"
        }
    }

    resp = llm.invoke([
        SystemMessage(content=system_prompt),
        HumanMessage(content=json.dumps(user_payload, ensure_ascii=False)),
    ])

    try:
        parsed = json.loads(resp.content)
        facts = parsed.get("facts", [])
        summary = parsed.get("summary", "")
        if not isinstance(facts, list):
            facts = []
    except Exception:
        # fallback: если JSON сломался — просто сохраним сырьё
        facts = []
        summary = resp.content.strip()

    # === 4) Собираем итоговый результат ===
    result = {
        "status": "done",
        "query": query,
        "facts": facts,
        "sources": sources,
        "summary": summary,
    }

    task["result"] = result
    task["status"] = "done"
    state["intermediate_results"][task_id] = result
    return state


In [12]:
import json

state = {
    "current_task_id": "T1",
    "tasks": [
        {
            "id": "T1",
            "role": "search",
            "description": "What is the GAIA benchmark in artificial intelligence?",
            "status": "pending",
            "result": None,
        }
    ],
    "resources": {
        "T1": {
            "model": "openai/gpt-4o-mini",
            "max_tokens": 800,
            "temperature": 0.2,
            "tools_policy": {
                "allow_search": True,
                "allow_human": False
            }
        }
    },
    "intermediate_results": {}
}

# имитируем выбор задачи
state["tasks"][0]["status"] = "in_progress"

# вызываем search-agent
state = search_agent_node(state)

print("=== SEARCH AGENT RESULT ===")
print(json.dumps(state["intermediate_results"]["T1"], ensure_ascii=False, indent=2))


=== SEARCH AGENT RESULT ===
{
  "status": "done",
  "query": "What is the GAIA benchmark in artificial intelligence?",
  "facts": [
    {
      "fact": "GAIA stands for General AI Assistants benchmark.",
      "source_url": "https://towardsdatascience.com/gaia-the-llm-agent-benchmark-everyones-talking-about/"
    },
    {
      "fact": "GAIA was developed by researchers from Meta, Hugging Face, AutoGPT, and GenAI.",
      "source_url": "https://www.ninjatech.ai/product/benchmarks"
    },
    {
      "fact": "The benchmark consists of 466 questions designed to evaluate AI systems on practical tasks.",
      "source_url": "https://arxiv.org/pdf/2311.12983"
    },
    {
      "fact": "GAIA tests abilities such as reasoning, multi-modality handling, web browsing, and tool-use proficiency.",
      "source_url": "https://hal.cs.princeton.edu/gaia"
    },
    {
      "fact": "The questions in GAIA are crafted to reflect real-world scenarios and require precise answers.",
      "source_url": "

## CRITIC AGENT NODE

In [11]:
import json
from typing import Any, Dict, List
from langchain_core.messages import SystemMessage, HumanMessage


def critic_agent_node(state: GraphState) -> GraphState:
    task_id = state.get("current_task_id")
    if not task_id:
        return state

    tasks = state["tasks"]
    task = next((t for t in tasks if t["id"] == task_id), None)
    if task is None:
        raise KeyError(f"Task {task_id} not found")

    cfg = state["resources"].get(task_id, {})
    tools_policy = cfg.get("tools_policy", {"allow_search": False, "allow_human": False})

    state.setdefault("intermediate_results", {})

    # === 1) Собираем вход для критика ===
    # Обычно критик проверяет результаты поиска; но он может видеть и другие результаты.
    intermediate = state["intermediate_results"]

    # Вытащим "наиболее релевантный" search-результат:
    search_payloads: List[Dict[str, Any]] = []
    for k, v in intermediate.items():
        if isinstance(v, dict) and ("sources" in v or "facts" in v):
            search_payloads.append({"task_id": k, "payload": v})

    # Если ничего нет — критик все равно может оценить ситуацию (например, поиск пропущен)
    critic_input = {
        "critic_task_description": task.get("description", ""),
        "available_intermediate_results": intermediate,
        "search_like_payloads": search_payloads,
        "notes": (
            "If search_like_payloads is empty, explain what is missing and what should be done next."
        ),
    }

    # === 2) (Опционально) Разрешить критику делать дополнительный поиск ===
    # В базовой версии я НЕ делаю автоматический tavily-вызов внутри критика,
    # чтобы не смешивать роли. Если хочешь — можно добавить.
    # Сейчас критик просто выдаёт "follow_up_queries" для SearchAgent.
    allow_search = bool(tools_policy.get("allow_search", False))

    # === 3) Вызываем LLM критика ===
    llm = make_llm(
        model=cfg.get("model", "openai/gpt-4o-mini"),
        temperature=cfg.get("temperature", 0.1),
        max_tokens=cfg.get("max_tokens", 900),
        tools=[],  # критик обычно без тулов; он анализирует уже собранное
    )

    system_prompt = (
        "You are a Critic agent in a multi-agent system.\n"
        "Your job is to evaluate the quality and reliability of intermediate results.\n\n"
        "Focus on:\n"
        "- verifiability of claims\n"
        "- missing sources / weak evidence\n"
        "- contradictions / inconsistencies\n"
        "- relevance to the original task\n"
        "- whether further search is required\n\n"
        "Output STRICT JSON only, no extra text.\n"
        "If you suggest further search, provide follow_up_queries.\n"
        "Be concise and actionable.\n"
    )

    user_prompt = {
        "task": task,
        "allow_search_for_critic": allow_search,
        "critic_input": critic_input,
        "required_output_schema": {
            "overall_verdict": "one of: ok | needs_more_evidence | incorrect_or_inconsistent",
            "issues": [
                {
                    "type": "one of: unverifiable | contradiction | missing_source | irrelevant | unclear",
                    "description": "str",
                    "severity": "one of: low | medium | high",
                    "related_fact": "optional str",
                }
            ],
            "confirmed_points": ["str"],
            "risk_notes": ["str"],
            "follow_up_queries": ["str"],
            "recommendations_for_answer_agent": ["str"]
        }
    }

    resp = llm.invoke([
        SystemMessage(content=system_prompt),
        HumanMessage(content=json.dumps(user_prompt, ensure_ascii=False)),
    ])

    # === 4) Парсим JSON + fallback ===
    raw = resp.content
    try:
        parsed = json.loads(raw)
    except Exception:
        # fallback: если LLM сорвалась — сохраним как текст, но в совместимом контейнере
        parsed = {
            "overall_verdict": "needs_more_evidence",
            "issues": [
                {
                    "type": "unclear",
                    "description": "LLM returned non-JSON output. See raw_text.",
                    "severity": "medium",
                    "related_fact": None
                }
            ],
            "confirmed_points": [],
            "risk_notes": ["Non-JSON critic output; treat with caution."],
            "follow_up_queries": [],
            "recommendations_for_answer_agent": ["Use only well-sourced claims; consider re-running search."],
            "_raw_text": raw
        }

    # === 5) Записываем результат ===
    task["result"] = parsed
    task["status"] = "done"
    state["intermediate_results"][task_id] = parsed

    return state


In [14]:
import json

# 1) state
state = {
    "current_task_id": "T2",
    "tasks": [
        {
            "id": "T1",
            "role": "search",
            "description": "What is the GAIA benchmark in artificial intelligence?",
            "status": "done",
            "result": None,
        },
        {
            "id": "T2",
            "role": "critic",
            "description": "Check the correctness and reliability of the search results",
            "status": "pending",
            "result": None,
        }
    ],
    "resources": {
        "T2": {
            "model": "openai/gpt-4o-mini",
            "max_tokens": 900,
            "temperature": 0.1,
            "tools_policy": {
                "allow_search": False,
                "allow_human": False
            }
        }
    },
    "intermediate_results": {
        "T1": {
            "status": "done",
            "facts": [
                {
                    "fact": "GAIA is a benchmark designed to evaluate multi-step reasoning and tool use in AI systems.",
                    "source_url": "https://arxiv.org/abs/2311.12983"
                }
            ],
            "sources": [
                {
                    "title": "GAIA: A Benchmark for General AI Assistants",
                    "url": "https://arxiv.org/abs/2311.12983",
                    "snippet": "GAIA is a benchmark that evaluates general AI assistants on real-world tasks."
                }
            ],
            "summary": "GAIA evaluates reasoning and tool usage in AI agents."
        }
    }
}

# 2) имитируем выбор задачи
state["tasks"][1]["status"] = "in_progress"

# 3) вызываем критика
state = critic_agent_node(state)

# 4) смотрим результат
print("=== CRITIC AGENT RESULT ===")
print(json.dumps(state["intermediate_results"]["T2"], ensure_ascii=False, indent=2))


=== CRITIC AGENT RESULT ===
{
  "overall_verdict": "ok",
  "issues": [],
  "confirmed_points": [
    "GAIA is a benchmark designed to evaluate multi-step reasoning and tool use in AI systems.",
    "The source provided is credible and directly supports the claims made."
  ],
  "risk_notes": [],
  "follow_up_queries": [],
  "recommendations_for_answer_agent": []
}


## ANSWER AGENT NODE

In [12]:
import json
import re
from typing import Any, Dict, Optional, List, Tuple
from langchain_core.messages import SystemMessage, HumanMessage


def _strip_fences(text: str) -> str:
    if not isinstance(text, str):
        return str(text)
    t = text.strip()
    # ```json ... ``` or ``` ... ```
    m = re.search(r"```(?:json)?\s*(.*?)\s*```", t, flags=re.DOTALL | re.IGNORECASE)
    return (m.group(1).strip() if m else t)


def _safe_json_loads(text: str) -> Optional[dict]:
    try:
        return json.loads(text)
    except Exception:
        return None


def _normalize_space(s: str) -> str:
    return re.sub(r"\s+", " ", (s or "").strip())


def _guess_expected_answer_type(question: str) -> str:
    q = (question or "").lower()
    # very lightweight heuristics
    if any(k in q for k in ["how many", "what number", "number of", "count of"]):
        return "number"
    if any(k in q for k in ["zip code", "postal code"]):
        return "zip_codes"
    if any(k in q for k in ["mm/dd/yy", "mm/dd/yyyy", "date"]):
        return "date"
    if any(k in q for k in ["url", "website", "official website"]):
        return "url"
    if any(k in q for k in ["which of these words", "one word", "word is used", "name of the symbol", "symbol name"]):
        return "single_token"
    return "text"


def _extract_first_number(text: str) -> Optional[str]:
    if not text:
        return None
    m = re.search(r"[-+]?\d+(?:\.\d+)?", text.replace(",", ""))
    return m.group(0) if m else None


def _extract_zip_codes(text: str) -> Optional[str]:
    if not text:
        return None
    zips = re.findall(r"\b\d{5}\b", text)
    if not zips:
        return None
    # GAIA обычно принимает "12345, 67890" или "12345 67890"
    return ", ".join(zips)


def _extract_url(text: str) -> Optional[str]:
    if not text:
        return None
    m = re.search(r"(https?://[^\s\)\]\}]+)", text)
    return m.group(1).rstrip(".,;") if m else None


def _extract_single_token(text: str) -> Optional[str]:
    if not text:
        return None
    # take first "word-like" token (letters/digits/hyphen)
    m = re.search(r"\b([A-Za-z0-9][A-Za-z0-9\-\']*)\b", text.strip())
    return m.group(1) if m else None


def _compact_documents_for_llm(documents: Any, max_chars_per_file: int = 2500, max_files: int = 6) -> List[Dict[str, Any]]:
    """
    Make documents payload small & robust:
    - keeps ok/kind/ext/name
    - includes content_json if present
    - otherwise includes truncated content string
    """
    if not isinstance(documents, dict):
        return []

    out = []
    for i, (fid, rec) in enumerate(documents.items()):
        if i >= max_files:
            break
        if not isinstance(rec, dict):
            continue

        content_json = rec.get("content_json", None)
        content = rec.get("content", None)

        item = {
            "file_id": fid,
            "name": rec.get("name"),
            "ext": rec.get("ext"),
            "mime": rec.get("mime"),
            "kind": rec.get("kind"),
            "ok": rec.get("ok"),
            "error": rec.get("error"),
            "content_json": content_json,
        }

        if content_json is None:
            s = _strip_fences(content if isinstance(content, str) else str(content))
            item["content_text"] = s[:max_chars_per_file]

        out.append(item)

    return out


def answer_agent_node(state: "GraphState") -> "GraphState":
    """
    Улучшенный AnswerAgent:
    - Видит результаты document_media_agent_node (intermediate_results["documents"])
    - Видит результаты search/extract/reason/calc/critic
    - Не спрашивает документы повторно, если они уже есть
    - Делает пост-нормализацию ответа под GAIA (число/URL/zip/single-token)
    - Сохраняет структурированный результат в task + intermediate_results
    """
    task_id = state.get("current_task_id")
    if not task_id:
        return state

    tasks = state.get("tasks", [])
    task = next((t for t in tasks if t.get("id") == task_id), None)
    if task is None:
        raise KeyError(f"Task {task_id} not found")

    cfg = (state.get("resources") or {}).get(task_id, {})
    tools_policy = cfg.get("tools_policy", {"allow_search": False, "allow_human": False, "allow_python": False})

    state.setdefault("intermediate_results", {})
    intermediate: Dict[str, Any] = state["intermediate_results"]

    # -------------------------
    # 1) Собираем контекст
    # -------------------------

    # Search payloads
    search_payloads: List[Dict[str, Any]] = []
    for k, v in intermediate.items():
        if isinstance(v, dict) and ("facts" in v or "sources" in v):
            search_payloads.append({"task_id": k, "payload": v})

    # Critic payload
    critic_payload: Optional[Dict[str, Any]] = None
    for k, v in intermediate.items():
        if isinstance(v, dict) and "overall_verdict" in v and "issues" in v:
            critic_payload = v
            break

    critic_verdict = "needs_more_evidence"
    if isinstance(critic_payload, dict):
        critic_verdict = critic_payload.get("overall_verdict", "needs_more_evidence")

    # Documents from document_media_agent_node
    documents_raw = intermediate.get("documents")
    documents_compact = _compact_documents_for_llm(documents_raw)

    has_docs_ok = any(d.get("ok") for d in documents_compact) if documents_compact else False

    # Also include upstream task results (extract/reason/calc etc.)
    # but keep them small: only dict-like and not the big documents bucket.
    upstream_results: List[Dict[str, Any]] = []
    for t in tasks:
        tid = t.get("id")
        if not tid or tid == task_id:
            continue
        if tid not in intermediate:
            continue
        if tid == "documents":
            continue
        v = intermediate.get(tid)
        if isinstance(v, (dict, list, str, int, float, bool)) and v is not None:
            # trim huge strings
            if isinstance(v, str) and len(v) > 3000:
                v = v[:3000]
            upstream_results.append({"task_id": tid, "role": t.get("role"), "result": v})

    user_query = state.get("query", "")
    expected_type = _guess_expected_answer_type(user_query)

    # -------------------------
    # 2) Payload для LLM
    # -------------------------
    payload = {
        "user_query": user_query,
        "answer_task_description": task.get("description", ""),
        "expected_answer_type_hint": expected_type,
        "documents_present": bool(documents_compact),
        "documents_ok_present": has_docs_ok,
        "documents": documents_compact,          # <-- КЛЮЧЕВОЕ: теперь Answer видит документы
        "search_payloads": search_payloads,
        "upstream_results": upstream_results,
        "critic": critic_payload,
        "critic_verdict": critic_verdict,
        "oracle_tools_policy": tools_policy,
        "required_output_schema": {
            "final_answer": "str",
            "final_answer_strict": "str  # MUST be minimal and machine-checkable",
            "used_evidence": ["str"],
            "confidence": "low|medium|high",
            "follow_up_needed": "bool",
            "follow_up_queries": ["str"]
        }
    }

    llm = make_llm(
        model=cfg.get("model", "openai/gpt-4o-mini"),
        temperature=cfg.get("temperature", 0.2),
        max_tokens=cfg.get("max_tokens", 1400),
        tools=[],
    )

    system_prompt = (
        "You are the Final Answer Agent for the GAIA benchmark.\n"
        "You MUST answer using the provided evidence.\n"
        "Evidence sources, in priority order:\n"
        "1) DOCUMENTS (tables/images/PDF text) if present and ok\n"
        "2) Upstream agent results (extract/reason/calc)\n"
        "3) Search facts + sources\n"
        "4) Critic verdict/constraints\n\n"
        "Critical rules:\n"
        "- If documents are present, do NOT ask the user to provide them again.\n"
        "- If the critic says 'incorrect', do not assert disputed claims.\n"
        "- Produce TWO fields:\n"
        "  (a) final_answer: natural short answer (can be a phrase)\n"
        "  (b) final_answer_strict: the minimal machine-checkable answer for scoring.\n"
        "- final_answer_strict MUST follow the expected type hint:\n"
        "  * number -> only digits (or decimal), no words\n"
        "  * url -> only the URL\n"
        "  * zip_codes -> comma-separated 5-digit codes\n"
        "  * single_token -> one token/word only\n"
        "  * text -> a short phrase, no extra commentary\n"
        "- Output STRICT JSON only.\n"
    )

    resp = llm.invoke([
        SystemMessage(content=system_prompt),
        HumanMessage(content=json.dumps(payload, ensure_ascii=False)),
    ])

    raw = resp.content
    raw_clean = _strip_fences(raw)

    parsed = _safe_json_loads(raw_clean)
    if not isinstance(parsed, dict):
        # fallback
        parsed = {
            "final_answer": raw_clean.strip(),
            "final_answer_strict": raw_clean.strip(),
            "used_evidence": [],
            "confidence": "low",
            "follow_up_needed": True,
            "follow_up_queries": [],
            "_raw_text": raw,
        }

    # -------------------------
    # 3) Пост-нормализация под GAIA (самая важная часть!)
    # -------------------------
    final_answer = _normalize_space(str(parsed.get("final_answer", "")))
    strict = _normalize_space(str(parsed.get("final_answer_strict", "")))

    # If LLM forgot strict, derive it.
    if not strict:
        strict = final_answer

    # Enforce strict format by hint
    if expected_type == "number":
        strict_num = _extract_first_number(strict) or _extract_first_number(final_answer)
        if strict_num is not None:
            strict = strict_num

    elif expected_type == "zip_codes":
        strict_zip = _extract_zip_codes(strict) or _extract_zip_codes(final_answer)
        if strict_zip is not None:
            strict = strict_zip

    elif expected_type == "url":
        strict_url = _extract_url(strict) or _extract_url(final_answer)
        if strict_url is not None:
            strict = strict_url

    elif expected_type == "single_token":
        tok = _extract_single_token(strict) or _extract_single_token(final_answer)
        if tok is not None:
            strict = tok

    # final_answer_strict should not be huge
    if len(strict) > 500:
        strict = strict[:500]

    # -------------------------
    # 4) Запись результата
    # -------------------------
    out = {
        "final_answer": final_answer,
        "final_answer_strict": strict,
        "used_evidence": parsed.get("used_evidence", parsed.get("used_facts", [])) or [],
        "confidence": parsed.get("confidence", "low"),
        "follow_up_needed": bool(parsed.get("follow_up_needed", False)),
        "follow_up_queries": parsed.get("follow_up_queries", []) or [],
        "critic_verdict": critic_verdict,
        "documents_used": bool(documents_compact),
        "documents_ok_present": has_docs_ok,
        "_raw_model_output": raw if cfg.get("debug_save_raw", False) else None,
    }

    task["result"] = out
    task["status"] = "done"
    state["intermediate_results"][task_id] = out

    # ВАЖНО: для GAIA скоринга чаще удобнее strict
    state["final_answer"] = out["final_answer_strict"]

    return state


## Extraction / Parsing Agent

In [13]:
import json
from langchain_core.messages import SystemMessage, HumanMessage

def extraction_agent_node(state: GraphState) -> GraphState:
    task_id = state.get("current_task_id")
    if not task_id:
        return state

    task = next(t for t in state["tasks"] if t["id"] == task_id)
    cfg = state["resources"].get(task_id, {})
    state.setdefault("intermediate_results", {})

    # Берём всё, что накопили агенты (обычно после Search)
    intermediate = state["intermediate_results"]

    llm = make_llm(
        model=cfg.get("model", "openai/gpt-4o-mini"),
        temperature=cfg.get("temperature", 0.0),
        max_tokens=cfg.get("max_tokens", 900),
        tools=[],
    )

    system_prompt = (
        "You are an Extraction/Parsing agent.\n"
        "Extract structured information from the provided intermediate results.\n"
        "Output STRICT JSON only.\n"
    )

    user_payload = {
        "task_description": task["description"],
        "intermediate_results": intermediate,
        "required_output_schema": {
            "entities": [{"name": "str", "type": "str", "evidence": "str"}],
            "key_points": ["str"],
            "numbers_and_units": [{"value": "str", "unit": "str", "context": "str"}],
            "definitions": [{"term": "str", "definition": "str", "source_url": "optional str"}],
            "open_questions": ["str"]
        }
    }

    resp = llm.invoke([
        SystemMessage(content=system_prompt),
        HumanMessage(content=json.dumps(user_payload, ensure_ascii=False))
    ])

    try:
        parsed = json.loads(resp.content)
    except Exception:
        parsed = {"_raw_text": resp.content}

    task["result"] = parsed
    task["status"] = "done"
    state["intermediate_results"][task_id] = parsed
    return state


## Reasoning Agent

In [14]:
import json
from langchain_core.messages import SystemMessage, HumanMessage

def reasoning_agent_node(state: GraphState) -> GraphState:
    task_id = state.get("current_task_id")
    if not task_id:
        return state

    task = next(t for t in state["tasks"] if t["id"] == task_id)
    cfg = state["resources"].get(task_id, {})
    state.setdefault("intermediate_results", {})

    intermediate = state["intermediate_results"]

    llm = make_llm(
        model=cfg.get("model", "openai/gpt-4o"),
        temperature=cfg.get("temperature", 0.2),
        max_tokens=cfg.get("max_tokens", 1200),
        tools=[],
    )

    system_prompt = (
        "You are a Reasoning agent.\n"
        "Use the extracted facts/structures to derive conclusions and propose a coherent solution.\n"
        "Do not invent facts. If evidence is missing, state assumptions explicitly.\n"
        "Output STRICT JSON only.\n"
    )

    user_payload = {
        "task_description": task["description"],
        "intermediate_results": intermediate,
        "required_output_schema": {
            "assumptions": ["str"],
            "reasoning_steps": ["str"],
            "conclusions": ["str"],
            "proposed_answer_outline": ["str"],
            "follow_up_needed": "bool",
            "follow_up_queries": ["str"]
        }
    }

    resp = llm.invoke([
        SystemMessage(content=system_prompt),
        HumanMessage(content=json.dumps(user_payload, ensure_ascii=False))
    ])

    try:
        parsed = json.loads(resp.content)
    except Exception:
        parsed = {"_raw_text": resp.content, "follow_up_needed": True}

    task["result"] = parsed
    task["status"] = "done"
    state["intermediate_results"][task_id] = parsed
    return state


## Calculation Agent (Code agent)

In [15]:
import json
from langchain_core.messages import SystemMessage, HumanMessage

def calculation_agent_node(state: GraphState) -> GraphState:
    task_id = state.get("current_task_id")
    if not task_id:
        return state

    task = next(t for t in state["tasks"] if t["id"] == task_id)
    cfg = state["resources"].get(task_id, {})
    state.setdefault("intermediate_results", {})

    llm = make_llm(
        model=cfg.get("model", "openai/gpt-4o-mini"),
        temperature=cfg.get("temperature", 0.0),
        max_tokens=cfg.get("max_tokens", 900),
        tools=[],  # код будем выполнять явно через python_tool
    )

    system_prompt = (
        "You are a Calculation/Code agent.\n"
        "Generate Python code to compute the required result.\n"
        "Rules:\n"
        "- Use pure Python.\n"
        "- Print final numeric answers clearly.\n"
        "- Output STRICT JSON with a 'python_code' field only.\n"
    )

    user_payload = {
        "task_description": task["description"],
        "intermediate_results": state["intermediate_results"],
        "required_output_schema": {
            "python_code": "str"
        }
    }

    resp = llm.invoke([
        SystemMessage(content=system_prompt),
        HumanMessage(content=json.dumps(user_payload, ensure_ascii=False))
    ])

    try:
        parsed = json.loads(resp.content)
        code = parsed.get("python_code", "")
    except Exception:
        code = ""
        parsed = {"python_code": "", "_raw_text": resp.content}

    # Выполняем код
    output = ""
    error = None
    if code.strip():
        try:
            output = python_tool.run(code)
        except Exception as e:
            error = str(e)

    result = {
        "python_code": code,
        "output": output,
        "error": error
    }

    task["result"] = result
    task["status"] = "done"
    state["intermediate_results"][task_id] = result
    return state


## Document_media_agent_node

In [16]:
os.environ["OPENROUTER_API_KEY"] = ""

# document_media_agent_node.py  (SYNC node, no async/await in caller)
import asyncio
from pathlib import Path
from typing import Dict, Any, List

from automas.mcp_client_file_analysis import extract_text as mcp_extract_text
from automas.mcp_client_media_analysis import (
    analyze_image as mcp_analyze_image,
    transcribe_audio as mcp_transcribe_audio,
    analyze_video as mcp_analyze_video,
)

# ---- Типы файлов ----
DOC_EXTS = {".pdf", ".docx", ".pptx", ".xlsx", ".zip", ".txt", ".md", ".csv", ".json"}
IMG_EXTS = {".png", ".jpg", ".jpeg", ".webp", ".bmp", ".tif", ".tiff", ".gif"}
AUDIO_EXTS = {".mp3", ".wav", ".flac", ".ogg", ".oga", ".aiff", ".aac", ".m4a", ".wma", ".opus"}
VIDEO_EXTS = {".mp4", ".mov", ".webm", ".mpeg", ".mpg", ".mkv"}


def _get_ext(path: str) -> str:
    return Path(path).suffix.lower()


def _pick_file_id(f: Dict[str, Any], idx: int) -> str:
    return str(f.get("id") or f.get("name") or f.get("path") or f"file_{idx}")


def _default_image_prompt(question: str) -> str:
    return (
        "You are a perception/OCR module for a benchmark.\n"
        "Your job: ONLY extract information from the image. DO NOT solve the task.\n\n"
        "Output STRICT JSON with the following schema:\n"
        "{\n"
        '  "numbers": [{"value": <number>, "near": "<what edge/segment it is next to>", "notes": "<short>"}],\n'
        '  "segments": [{"orientation": "horizontal|vertical|unknown", "length": <number>, "near": "<description>"}],\n'
        '  "shape_description": "<short factual description of the polygon/layout>",\n'
        '  "assumptions": ["<only if necessary: unreadable/uncertain parts>"]\n'
        "}\n\n"
        "Rules:\n"
        "- No area computation.\n"
        "- No missing-length inference.\n"
        "- No coordinate guesses.\n"
        "- No reasoning steps.\n"
        "- If unclear, put it into assumptions.\n\n"
        f"Task question (context only): {question}"
    )


def _default_audio_prompt(question: str) -> str:
    return (
        "You are an audio transcription module for a benchmark.\n"
        "Your job: ONLY transcribe the audio as accurately as possible.\n"
        "Return ONLY the transcript (no summary, no answer).\n"
        "If multiple speakers, mark them as Speaker 1 / Speaker 2.\n\n"
        f"Task question (context only): {question}"
    )


def _default_video_prompt(question: str) -> str:
    return (
        "You are a video perception module for a benchmark.\n"
        "Your job: ONLY extract information from the video. DO NOT solve the task.\n"
        "Return a structured factual report:\n"
        "1) Scene-by-scene brief description.\n"
        "2) Any visible text (verbatim).\n"
        "3) Key objects/actions relevant to the question.\n"
        "Do NOT compute, infer missing values, or provide a final answer.\n\n"
        f"Task question (context only): {question}"
    )


def _looks_like_solution(text: str) -> bool:
    if not text:
        return False
    t = str(text).lower()
    bad = ["total area", "area calculation", "square units", "final answer", "therefore", "hence"]
    return any(x in t for x in bad)


def _run_async_blocking(awaitable):
    """
    Запуск awaitable из синхронного кода.
    В Jupyter часто уже есть активный event loop — тогда используем nest_asyncio.
    """
    try:
        loop = asyncio.get_running_loop()
    except RuntimeError:
        loop = None

    if loop and loop.is_running():
        import nest_asyncio
        nest_asyncio.apply()
        return loop.run_until_complete(awaitable)
    else:
        return asyncio.run(awaitable)


async def _resolve_awaitables(x, timeout: int, max_depth: int = 3):
    """
    Некоторые MCP-обёртки могут вернуть coroutine внутри coroutine.
    Разворачиваем до обычного объекта.
    """
    cur = x
    for _ in range(max_depth):
        if asyncio.iscoroutine(cur) or asyncio.isfuture(cur):
            cur = await asyncio.wait_for(cur, timeout=timeout)
        else:
            break
    return cur


async def _call_mcp(fn, *args, timeout: int = 180, max_depth: int = 3, **kwargs):
    res = fn(*args, **kwargs)         # может быть sync или async
    res = await _resolve_awaitables(res, timeout=timeout, max_depth=max_depth)
    return res


def document_media_agent_node(state: "GraphState") -> "GraphState":
    """
    SYNC LangGraph node в стиле твоих агентов:
    - берёт текущую задачу (current_task_id)
    - читает файлы из state["files"]
    - кладёт результаты в:
        state["intermediate_results"]["documents"][file_id]  (подготовка для extract)
        state["intermediate_results"][task_id]               (результат именно task)
        task["result"], task["status"]="done"
    """

    task_id = state.get("current_task_id")
    if not task_id:
        return state

    task = next(t for t in state.get("tasks", []) if t.get("id") == task_id)
    cfg = (state.get("resources") or {}).get(task_id, {})  # сейчас тут мало что нужно, но оставим совместимость

    state.setdefault("intermediate_results", {})
    docs_slot = state["intermediate_results"].setdefault("documents", {})

    files: List[Dict[str, Any]] = state.get("files") or []
    question: str = state.get("query") or ""

    # Таймауты (можно тюнить через resources, если захочешь)
    DOC_TIMEOUT = int(cfg.get("doc_timeout", 180))
    MEDIA_TIMEOUT = int(cfg.get("media_timeout", 180))

    IMAGE_MAX_TOKENS = int(cfg.get("image_max_tokens", 900))
    AUDIO_MAX_TOKENS = int(cfg.get("audio_max_tokens", 2048))
    VIDEO_MAX_TOKENS = int(cfg.get("video_max_tokens", 2048))

    if not files:
        result = {"ok": True, "note": "No attached files.", "files": []}
        task["result"] = result
        task["status"] = "done"
        state["intermediate_results"][task_id] = result
        return state

    processed: List[Dict[str, Any]] = []

    for idx, f in enumerate(files):
        fid = _pick_file_id(f, idx)
        path = str(f.get("path") or "")
        name = str(f.get("name") or Path(path).name)
        mime = str(f.get("mime") or "").lower()
        ext = _get_ext(path)

        # cache: не перечитываем ok
        if fid in docs_slot and isinstance(docs_slot[fid], dict) and docs_slot[fid].get("ok"):
            processed.append(docs_slot[fid])
            continue

        record: Dict[str, Any] = {
            "id": fid,
            "name": name,
            "path": path,
            "mime": mime,
            "ext": ext,
            "ok": False,
            "kind": None,
            "content": None,
            "error": None,
        }

        try:
            # ---- Document ----
            if ext in DOC_EXTS or mime in {
                "application/pdf",
                "application/zip",
                "application/vnd.openxmlformats-officedocument.wordprocessingml.document",
                "application/vnd.openxmlformats-officedocument.presentationml.presentation",
                "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
                "text/plain",
            }:
                record["kind"] = "document"
                text = _run_async_blocking(_call_mcp(mcp_extract_text, path, timeout=DOC_TIMEOUT))
                record["content"] = text
                record["ok"] = True

            # ---- Image ----
            elif ext in IMG_EXTS or mime.startswith("image/"):
                record["kind"] = "image"
                prompt = f.get("prompt") or _default_image_prompt(question)

                analysis = _run_async_blocking(_call_mcp(
                    mcp_analyze_image,
                    path,
                    prompt=prompt,
                    max_tokens=IMAGE_MAX_TOKENS,
                    timeout=MEDIA_TIMEOUT,
                ))

                if _looks_like_solution(analysis):
                    analysis = _run_async_blocking(_call_mcp(
                        mcp_analyze_image,
                        path,
                        prompt=_default_image_prompt(question),
                        max_tokens=IMAGE_MAX_TOKENS,
                        timeout=MEDIA_TIMEOUT,
                    ))

                record["content"] = analysis
                record["ok"] = True

            # ---- Audio ----
            elif ext in AUDIO_EXTS or mime.startswith("audio/"):
                record["kind"] = "audio"
                prompt = f.get("prompt") or _default_audio_prompt(question)
                transcript = _run_async_blocking(_call_mcp(
                    mcp_transcribe_audio,
                    path,
                    prompt=prompt,
                    max_tokens=AUDIO_MAX_TOKENS,
                    timeout=MEDIA_TIMEOUT,
                ))
                record["content"] = transcript
                record["ok"] = True

            # ---- Video ----
            elif ext in VIDEO_EXTS or mime.startswith("video/"):
                record["kind"] = "video"
                prompt = f.get("prompt") or _default_video_prompt(question)
                analysis = _run_async_blocking(_call_mcp(
                    mcp_analyze_video,
                    path,
                    prompt=prompt,
                    max_tokens=VIDEO_MAX_TOKENS,
                    timeout=MEDIA_TIMEOUT + 30,
                ))
                record["content"] = analysis
                record["ok"] = True

            else:
                record["kind"] = "unknown"
                record["error"] = f"Unsupported file type: ext={ext}, mime={mime}"

        except Exception as e:
            record["error"] = str(e)

        docs_slot[fid] = record
        processed.append(record)

    # Результат именно для текущей task (document)
    result = {
        "ok": True,
        "files_count": len(files),
        "processed": processed,
    }

    task["result"] = result
    task["status"] = "done"
    state["intermediate_results"][task_id] = result

    return state


In [30]:
import json
from pathlib import Path

# === Пути ===
PROJECT_ROOT = Path(r"C:/Users/oklad/Desktop/LLM_Games").resolve()
GAIA_FILES_ROOT = PROJECT_ROOT / "workspace" / "gaia_files"

IMG = GAIA_FILES_ROOT / "2023/validation/6359a0b1-8f7b-499b-9336-840f9ab90688.png"
assert IMG.exists(), f"Image not found: {IMG}"

# === Минимальный state под НОВУЮ sync-версию document_media_agent_node ===
# ВАЖНО: теперь нода работает как "обычный агент" и ожидает current_task_id + tasks (+ resources опционально)
state = {
    "gaia_task_id": "6359a0b1-8f7b-499b-9336-840f9ab90688",
    "query": (
        "What is the area of the green polygon in the attached file? "
        "The numbers in purple represent the lengths of the side they are next to."
    ),
    "files": [
        {
            "id": "f1",
            "name": IMG.name,
            "path": str(IMG),
            "mime": "image/png",
        }
    ],

    # поля, которые ожидают "агенты" в твоём графе
    "tasks": [
        {
            "id": "TDOC",
            "role": "document",
            "description": "Parse attached files and extract structured info (no solving).",
            "depends_on": [],
            "priority": 1,
            "status": "pending",
            "result": None,
        }
    ],
    "current_task_id": "TDOC",
    "resources": {
        # можно оставить пустым, но покажу дефолты
        "TDOC": {
            "model": "openai/gpt-4o-mini",
            "max_tokens": 900,
            "temperature": 0.1,
            "tools_policy": {"allow_search": False, "allow_human": False, "allow_python": False},
            # опциональные таймауты/лимиты для ноды (если хочешь тюнить)
            # "doc_timeout": 180,
            # "media_timeout": 180,
            # "image_max_tokens": 900,
            # "audio_max_tokens": 2048,
            # "video_max_tokens": 2048,
        }
    },
    "intermediate_results": {},
    "final_answer": None,
}

# === Запуск sync-ноды (БЕЗ await) ===
state = document_media_agent_node(state)

# === Проверка результата ===
print("=== DOCUMENT/MEDIA AGENT RESULT (documents) ===")
print(json.dumps(state["intermediate_results"]["documents"], ensure_ascii=False, indent=2, default=str))

doc = state["intermediate_results"]["documents"]["f1"]

print("\n--- SUMMARY ---")
print("ok:", doc["ok"])
print("kind:", doc["kind"])
print("error:", doc["error"])

print("\n--- CONTENT (first 1500 chars) ---")
print((doc["content"] or "")[:1500])

print("\n=== TASK RESULT (TDOC) ===")
print(json.dumps(state["intermediate_results"]["TDOC"], ensure_ascii=False, indent=2, default=str))


=== DOCUMENT/MEDIA AGENT RESULT (documents) ===
{
  "f1": {
    "id": "f1",
    "name": "6359a0b1-8f7b-499b-9336-840f9ab90688.png",
    "path": "C:\\Users\\oklad\\Desktop\\LLM_Games\\workspace\\gaia_files\\2023\\validation\\6359a0b1-8f7b-499b-9336-840f9ab90688.png",
    "mime": "image/png",
    "ext": ".png",
    "ok": true,
    "kind": "image",
    "content": "```json\n{\n  \"numbers\": [\n    {\"value\": 10, \"near\": \"top edge\", \"notes\": \"purple\"},\n    {\"value\": 6, \"near\": \"left edge\", \"notes\": \"purple\"},\n    {\"value\": 1, \"near\": \"bottom left corner\", \"notes\": \"purple\"},\n    {\"value\": 4, \"near\": \"horizontal segment inside\", \"notes\": \"purple\"},\n    {\"value\": 8, \"near\": \"horizontal segment inside\", \"notes\": \"purple\"},\n    {\"value\": 6, \"near\": \"vertical segment inside\", \"notes\": \"purple\"},\n    {\"value\": 1.5, \"near\": \"vertical segment inside\", \"notes\": \"purple\"},\n    {\"value\": 6, \"near\": \"vertical segment insi

# Сборка графа

Нода завершения (finish_node)

In [17]:
def finish_node(state: GraphState) -> GraphState:
    answer_task = next(t for t in state["tasks"] if t["role"] == "answer")
    state["final_answer"] = answer_task["result"]
    return state

Сборка графа

In [18]:
from langgraph.graph import StateGraph, END

graph = StateGraph(GraphState)

# --- core pipeline ---
graph.add_node("planner", planner_node)
graph.add_node("oracle", oracle_node)
graph.add_node("select_task", select_task_node)

# --- executors (agents) ---
graph.add_node("search_agent", search_agent_node)
graph.add_node("document_media_agent", document_media_agent_node)   # <-- NEW
graph.add_node("extraction_agent", extraction_agent_node)
graph.add_node("reasoning_agent", reasoning_agent_node)
graph.add_node("calculation_agent", calculation_agent_node)
graph.add_node("critic_agent", critic_agent_node)
graph.add_node("answer_agent", answer_agent_node)

graph.add_node("finish", finish_node)

# entry: planner -> oracle -> select_task
graph.set_entry_point("planner")
graph.add_edge("planner", "oracle")
graph.add_edge("oracle", "select_task")

# conditional routing after task selection
graph.add_conditional_edges(
    "select_task",
    route_by_role,  # должен уметь вернуть "document_media_agent" для role=="document"
    {
        "search_agent": "search_agent",
        "document_media_agent": "document_media_agent",   # <-- NEW
        "extraction_agent": "extraction_agent",
        "reasoning_agent": "reasoning_agent",
        "calculation_agent": "calculation_agent",
        "critic_agent": "critic_agent",
        "answer_agent": "answer_agent",
        END: "finish",  # если route_by_role возвращает END
        "finish": "finish",  # на случай если роутер возвращает строку "finish"
    },
)

# loop back after each agent to select next task
graph.add_edge("search_agent", "select_task")
graph.add_edge("document_media_agent", "select_task")     # <-- NEW
graph.add_edge("extraction_agent", "select_task")
graph.add_edge("reasoning_agent", "select_task")
graph.add_edge("calculation_agent", "select_task")
graph.add_edge("critic_agent", "select_task")
graph.add_edge("answer_agent", "select_task")

# finish -> END
graph.add_edge("finish", END)

app = graph.compile()


In [27]:
from pprint import pprint

def run_with_trace(app, initial_state):
    last_resources = None

    for event in app.stream(initial_state, stream_mode="values"):
        # event — это текущее значение state после шага
        resources = event.get("resources")
        if resources and resources != last_resources:
            print("\n=== RESOURCES UPDATED ===")
            pprint(resources)
            last_resources = resources

        # если ты хранишь выбор модели внутри intermediate_results — тоже можно печатать
        # pprint(event.get("intermediate_results"))

    return event  # последнее состояние

final_state = run_with_trace(app, {
    "query": "What is the official website of the University of Oxford?",
    "tasks": [],
    "current_task_id": None,
    "resources": {},
    "intermediate_results": {},
    "final_answer": None,
})
print("\nFINAL ANSWER:", final_state.get("final_answer"))



=== RESOURCES UPDATED ===
{'T1': {'max_tokens': 200,
        'model': 'openai/gpt-4o-mini',
        'temperature': 0.5,
        'tools_policy': {'allow_human': False, 'allow_search': True}},
 'T2': {'max_tokens': 600,
        'model': 'openai/gpt-4o-mini',
        'temperature': 0.2,
        'tools_policy': {'allow_human': False, 'allow_search': False}},
 'T3': {'max_tokens': 3200,
        'model': 'openai/gpt-4o-mini',
        'temperature': 0.7,
        'tools_policy': {'allow_human': False, 'allow_search': False}}}

FINAL ANSWER: {'final_answer': 'The official website of the University of Oxford is https://www.ox.ac.uk/.', 'used_facts': ['The official website of the University of Oxford is https://www.ox.ac.uk/', 'The University of Oxford is located in Oxford, England.', 'The University of Oxford is the oldest university in the English-speaking world.'], 'ignored_or_flagged_points': [], 'confidence': 'high', 'follow_up_needed': False, 'follow_up_queries': []}


# Эксперименты

## Отдельный запрос

In [28]:
def run_agent(app, query: str):
    initial_state: GraphState = {
        "query": query,
        "tasks": [],
        "current_task_id": None,
        "resources": {},
        "intermediate_results": {},
        "final_answer": None,
    }

    result_state = app.invoke(initial_state)
    return result_state["final_answer"]


# === запуск ===
answer = run_agent(app, "What is the official website of the University of Oxford?")
print(answer)


{'final_answer': 'The official website of the University of Oxford is [ox.ac.uk](https://www.ox.ac.uk/).', 'used_facts': ['The official website of the University of Oxford is ox.ac.uk.'], 'ignored_or_flagged_points': [], 'confidence': 'high', 'follow_up_needed': False, 'follow_up_queries': []}


## Установка GAIA

In [29]:
#!pip -q install datasets huggingface_hub tqdm


[notice] A new release of pip is available: 25.1.1 -> 25.3
[notice] To update, run: python.exe -m pip install --upgrade pip


In [19]:
!pip install hf_xet




[notice] A new release of pip is available: 25.1.1 -> 25.3
[notice] To update, run: python.exe -m pip install --upgrade pip


In [20]:
from datasets import load_dataset
from huggingface_hub import snapshot_download
import os

DATA_DIR = snapshot_download(
    repo_id="gaia-benchmark/GAIA",
    repo_type="dataset",
    token=HF_TOKEN,
)

CONFIG_NAME = "2023_all"     # можно "2023_level1", "2023_level2", "2023_level3"
SPLIT = "validation"         # "validation" для локального скоринга

ds = load_dataset(DATA_DIR, CONFIG_NAME, split=SPLIT)
print("Rows:", len(ds))
print("Columns:", ds.column_names)
ds[0]


Fetching 119 files:   0%|          | 0/119 [00:00<?, ?it/s]

Rows: 165
Columns: ['task_id', 'Question', 'Level', 'Final answer', 'file_name', 'file_path', 'Annotator Metadata']


{'task_id': 'c61d22de-5f6c-4958-a7f6-5e9707bd3466',
 'Question': 'A paper about AI regulation that was originally submitted to arXiv.org in June 2022 shows a figure with three axes, where each axis has a label word at both ends. Which of these words is used to describe a type of society in a Physics and Society article submitted to arXiv.org on August 11, 2016?',
 'Level': '2',
 'Final answer': 'egalitarian',
 'file_name': '',
 'file_path': '',
 'Annotator Metadata': {'Steps': '1. Go to arxiv.org and navigate to the Advanced Search page.\n2. Enter "AI regulation" in the search box and select "All fields" from the dropdown.\n3. Enter 2022-06-01 and 2022-07-01 into the date inputs, select "Submission date (original)", and submit the search.\n4. Go through the search results to find the article that has a figure with three axes and labels on each end of the axes, titled "Fairness in Agreement With European Values: An Interdisciplinary Perspective on AI Regulation".\n5. Note the six words 

In [21]:
ds[90]

{'task_id': '6359a0b1-8f7b-499b-9336-840f9ab90688',
 'Question': 'What is the area of the green polygon in the attached file? The numbers in purple represent the lengths of the side they are next to.',
 'Level': '2',
 'Final answer': '39',
 'file_name': '6359a0b1-8f7b-499b-9336-840f9ab90688.png',
 'file_path': '2023/validation/6359a0b1-8f7b-499b-9336-840f9ab90688.png',
 'Annotator Metadata': {'Steps': '1. Open the attached file.\n2. Split the shape into five rectangles.\n3. Find the missing side lengths from the side lengths that are given.\n4. Find the area for each rectangle.\n5. Add the areas together to get the area of the entire shape, 39.',
  'Number of steps': '5',
  'How long did this take?': '5-10 minutes',
  'Tools': '1. Image recognition\n2. OCR\n3. Calculator',
  'Number of tools': '3'}}

In [2]:
## Преобразование данных

from pathlib import Path
from pprint import pprint

# === БАЗОВЫЕ ПУТИ (как ты и просил) ===
PROJECT_ROOT = Path(r"C:/Users/oklad/Desktop/LLM_Games").resolve()
GAIA_FILES_ROOT = PROJECT_ROOT / "workspace" / "gaia_files"

items = []

for i, row in enumerate(ds):
    qid = row.get("task_id", f"gaia_{i:05d}")

    # Поля GAIA (как ты показал)
    question = row.get("Question")
    ground_truth = row.get("Final answer")

    item = {
        "id": qid,
        "question": question,
        "ground_truth": ground_truth,
        "level": row.get("Level"),
    }

    # GAIA file info
    file_path = (row.get("file_path") or "").strip()
    file_name = (row.get("file_name") or "").strip()

    if file_path:
        # GAIA обычно даёт относительный путь вроде:
        # 2023/validation/xxxxxxxx.png
        rel_path = Path(file_path)

        # ВСЕГДА маппим в workspace/gaia_files
        abs_path = GAIA_FILES_ROOT / rel_path

        file_ref = {
            "id": rel_path.as_posix(),
            "name": file_name or rel_path.name,
            "path": str(abs_path),
            "source": "gaia",
            "mime": None,                 # можно определить позже
            "original_path": rel_path.as_posix(),
        }

        item["files"] = [file_ref]

    items.append(item)

print("Prepared items:", len(items))
print("Example item:")
pprint(items[90])


NameError: name 'ds' is not defined

## Запуск эксперимента

- Прогоняются все задачи gaia
- Сохраняются логи работы для каждой задачи 
- Рассчитываются итоговые метрики

In [31]:
import os, json, time, re, traceback
from typing import Any, Dict, List, Optional
from pprint import pprint


# ---------- utils ----------
def _ensure_dir(p: str) -> None:
    os.makedirs(p, exist_ok=True)

def _write_json(p: str, obj: Any) -> None:
    with open(p, "w", encoding="utf-8") as f:
        json.dump(obj, f, ensure_ascii=False, indent=2)

def _append_jsonl(p: str, obj: Any) -> None:
    with open(p, "a", encoding="utf-8") as f:
        f.write(json.dumps(obj, ensure_ascii=False) + "\n")

def _norm(s: str) -> str:
    s = (s or "").strip().lower()
    s = re.sub(r"\s+", " ", s)
    s = s.strip(" \t\n\r.,;:!?'\"`~()[]{}<>")
    return s

def _extract_first_number(s: str) -> Optional[float]:
    m = re.search(r"[-+]?\d+(?:[.,]\d+)?", s or "")
    if not m:
        return None
    try:
        return float(m.group(0).replace(",", "."))
    except Exception:
        return None

def _safe_get_question(item: Dict[str, Any]) -> str:
    return item.get("question") or item.get("query") or item.get("prompt") or ""

def _safe_get_gt(item: Dict[str, Any]) -> Optional[str]:
    return item.get("ground_truth") or item.get("answer") or item.get("gt")

def _safe_get_id(item: Dict[str, Any], idx: int) -> str:
    return str(item.get("id") or item.get("task_id") or item.get("gaia_task_id") or f"item_{idx:05d}")

def score_prediction(pred: str, gt: Optional[str]) -> Dict[str, Any]:
    """
    Basic metrics:
    - exact_match
    - normalized_match
    - numeric_match (if both sides contain a number)
    """
    pred = pred or ""
    out = {
        "exact_match": False,
        "normalized_match": False,
        "numeric_match": None,
        "pred_norm": _norm(pred),
        "gt_norm": _norm(gt or "") if gt is not None else None,
    }
    if gt is None:
        return out

    out["exact_match"] = (pred.strip() == gt.strip())
    out["normalized_match"] = (_norm(pred) == _norm(gt))

    pnum = _extract_first_number(pred)
    gnum = _extract_first_number(gt)
    if pnum is None or gnum is None:
        out["numeric_match"] = None
    else:
        eps_abs = 1e-3
        eps_rel = 1e-3
        out["numeric_match"] = (abs(pnum - gnum) <= max(eps_abs, eps_rel * max(1.0, abs(gnum))))

    return out


def _pick_answer_string(final_state: Dict[str, Any]) -> str:
    """
    Return the string we should SCORE.
    Priority:
    1) final_state["final_answer"] if it is a non-empty str (your new answer_agent should set strict here)
    2) answer task.result["final_answer_strict"] or ["final_answer"]
    3) answer task.result if it is a str
    else ""
    """
    if not isinstance(final_state, dict):
        return ""

    fa = final_state.get("final_answer")
    if isinstance(fa, str) and fa.strip():
        return fa.strip()

    tasks = final_state.get("tasks") or []
    ans_task = next((t for t in tasks if t.get("role") == "answer"), None)
    if not ans_task:
        return ""

    res = ans_task.get("result")
    if isinstance(res, dict):
        s = res.get("final_answer_strict") or res.get("final_answer") or ""
        return str(s).strip()
    if isinstance(res, str):
        return res.strip()

    return ""


# ---------- streaming trace (intermediate results) ----------
def _run_with_trace(app, initial_state: Dict[str, Any], *, verbose: bool = True) -> Dict[str, Any]:
    """
    Streams `state` values and prints intermediate progress:
    - when resources updated
    - when current_task_id changes
    - when tasks status changes
    - intermediate_results keys
    - final_answer preview
    Returns final state.
    """
    last_resources = None
    last_task_id = None
    last_tasks_snapshot = None
    last_intermediate_keys = None

    final_state = None

    try:
        iterator = app.stream(initial_state, stream_mode="values")
    except TypeError:
        iterator = app.stream(initial_state)

    for state in iterator:
        final_state = state
        if not verbose:
            continue

        # current task change
        cur_id = state.get("current_task_id")
        if cur_id != last_task_id:
            print("\n--- CURRENT TASK ---")
            print("current_task_id:", cur_id)
            if cur_id:
                task = next((t for t in state.get("tasks", []) if t.get("id") == cur_id), None)
                if task:
                    print("role:", task.get("role"))
                    print("desc:", (task.get("description") or "")[:300])
            last_task_id = cur_id

        # resources updates
        resources = state.get("resources")
        if resources and resources != last_resources:
            print("\n=== RESOURCES UPDATED ===")
            pprint(resources)
            last_resources = resources

        # tasks snapshot
        tasks_snapshot = [(t.get("id"), t.get("role"), t.get("status")) for t in state.get("tasks", [])]
        if tasks_snapshot != last_tasks_snapshot:
            print("\n=== TASKS STATUS ===")
            pprint(tasks_snapshot)
            last_tasks_snapshot = tasks_snapshot

        # intermediate results keys
        interm = state.get("intermediate_results") or {}
        if isinstance(interm, dict):
            keys = list(interm.keys())
            if keys != last_intermediate_keys:
                print("\n=== INTERMEDIATE RESULTS KEYS ===")
                print(keys[:50])
                last_intermediate_keys = keys

        # final answer preview (string or dict; preview safely)
        if state.get("final_answer"):
            print("\n=== FINAL ANSWER (preview) ===")
            fa = state.get("final_answer")
            print((fa if isinstance(fa, str) else str(fa))[:800])

    return final_state or initial_state


# ---------- pre-flight ----------
assert "items" in globals() and isinstance(items, list) and len(items) > 0, \
    "items must be a non-empty list (you already prepared it from GAIA ds)."

assert "app" in globals(), "app must exist (compiled graph: app = graph.compile())."

# ---------- output folder ----------
RUN_NAME = time.strftime("gaia_run_%Y%m%d_%H%M%S")
OUT_DIR = os.path.join("gaia_validation_runs", RUN_NAME)
PER_ITEM_DIR = os.path.join(OUT_DIR, "items")
_ensure_dir(PER_ITEM_DIR)

SUMMARY_JSONL = os.path.join(OUT_DIR, "results.jsonl")
SUMMARY_JSON = os.path.join(OUT_DIR, "summary.json")

print("Output dir:", OUT_DIR)

# ---------- knobs ----------
MAX_ITEMS = None          # e.g. 10 for quick test; None = all
VERBOSE_STREAM = True     # set False to reduce console spam
SLEEP_BETWEEN = 0.0       # e.g. 0.2 if you want calmer output

# ---------- main evaluation loop ----------
results: List[Dict[str, Any]] = []
n_total = len(items)
n = min(n_total, MAX_ITEMS) if MAX_ITEMS else n_total

for idx in range(n):
    item = items[idx]
    qid = _safe_get_id(item, idx)
    query = _safe_get_question(item)
    gt = _safe_get_gt(item)
    files = item.get("files")

    print("\n" + "=" * 110)
    print(f"[{idx+1}/{n}] GAIA id={qid}  level={item.get('level')}")
    print("Q:", (query or "")[:2000])

    init_state: Dict[str, Any] = {
        "query": query,
        "tasks": [],
        "current_task_id": None,
        "resources": {},
        "intermediate_results": {},
        "final_answer": None,

        # GAIA additions
        "gaia_task_id": qid,
        "ground_truth": gt,
    }
    if files:
        init_state["files"] = files

    t0 = time.time()
    error_text: Optional[str] = None
    final_state: Optional[Dict[str, Any]] = None

    try:
        if hasattr(app, "stream"):
            final_state = _run_with_trace(app, init_state, verbose=VERBOSE_STREAM)
        else:
            final_state = app.invoke(init_state)
    except Exception:
        error_text = traceback.format_exc()

    dt = time.time() - t0

    tasks_out = None
    resources_out = None
    interm_out = None

    # raw answer for logs (could be str or dict depending on your pipeline)
    answer_raw = None
    # prediction string that we actually score
    answer_pred = ""

    if final_state:
        tasks_out = final_state.get("tasks")
        resources_out = final_state.get("resources")
        interm_out = final_state.get("intermediate_results")

        answer_raw = final_state.get("final_answer")
        answer_pred = _pick_answer_string(final_state)

    if error_text:
        print("\n[ERROR]")
        print(error_text[-4000:])
    else:
        print("\n[FINAL ANSWER (strict scored)]")
        print(answer_pred[:2000])

    item_score = score_prediction(answer_pred, gt)

    item_record = {
        "id": qid,
        "query": query,
        "ground_truth": gt,

        # keep both:
        "answer_raw": answer_raw,
        "answer_pred": answer_pred,

        "error": error_text,
        "duration_sec": dt,

        # requested artifacts:
        "tasks": tasks_out,
        "resources": resources_out,

        # optional debug:
        "level": item.get("level"),
        "files": files,
        "intermediate_results_keys": list((interm_out or {}).keys()) if isinstance(interm_out, dict) else None,
        "metrics": item_score,
    }

    # save per item
    _write_json(os.path.join(PER_ITEM_DIR, f"{qid}.json"), item_record)

    # append live progress
    _append_jsonl(SUMMARY_JSONL, {
        "id": qid,
        "level": item.get("level"),
        "duration_sec": dt,
        "error": bool(error_text),
        "answer_preview": (answer_pred[:200] if answer_pred else None),
        "metrics": item_score,
    })

    results.append(item_record)

    if SLEEP_BETWEEN:
        time.sleep(SLEEP_BETWEEN)


# ---------- aggregate metrics ----------
total = len(results)
ok = sum(1 for r in results if not r["error"])
exact = sum(1 for r in results if r["metrics"]["exact_match"] is True)
norm = sum(1 for r in results if r["metrics"]["normalized_match"] is True)

num_match = sum(1 for r in results if r["metrics"]["numeric_match"] is True)
num_applicable = sum(1 for r in results if r["metrics"]["numeric_match"] is not None)

summary = {
    "run_dir": OUT_DIR,
    "total": total,
    "completed_without_error": ok,
    "exact_match_acc": exact / total if total else 0.0,
    "normalized_match_acc": norm / total if total else 0.0,
    "numeric_match_acc_applicable": (num_match / num_applicable) if num_applicable else None,
    "numeric_applicable": num_applicable,
    "timestamp": RUN_NAME,
}

_write_json(SUMMARY_JSON, summary)

print("\n" + "=" * 110)
print("DONE. Summary:")
print(json.dumps(summary, ensure_ascii=False, indent=2))
print("Per-item JSON:", PER_ITEM_DIR)
print("Progress JSONL:", SUMMARY_JSONL)
print("Summary JSON:", SUMMARY_JSON)


Output dir: gaia_validation_runs\gaia_run_20260109_000000

[1/165] GAIA id=c61d22de-5f6c-4958-a7f6-5e9707bd3466  level=2
Q: A paper about AI regulation that was originally submitted to arXiv.org in June 2022 shows a figure with three axes, where each axis has a label word at both ends. Which of these words is used to describe a type of society in a Physics and Society article submitted to arXiv.org on August 11, 2016?

=== TASKS STATUS ===
[]

=== INTERMEDIATE RESULTS KEYS ===
[]

=== TASKS STATUS ===
[('T1', 'search', 'pending'),
 ('T2', 'search', 'pending'),
 ('T3', 'document', 'pending'),
 ('T4', 'document', 'pending'),
 ('T5', 'extract', 'pending'),
 ('T6', 'reason', 'pending'),
 ('T7', 'answer', 'pending')]

=== RESOURCES UPDATED ===
{'T1': {'max_tokens': 600,
        'model': 'openai/gpt-4o-mini',
        'temperature': 0.3,
        'tools_policy': {'allow_human': False,
                         'allow_python': False,
                         'allow_search': True}},
 'T2': {'max_

Python REPL can execute arbitrary code. Use with caution.



=== TASKS STATUS ===
[('T1', 'search', 'done'),
 ('T2', 'extract', 'done'),
 ('T3', 'calc', 'done'),
 ('T4', 'answer', 'pending')]

=== INTERMEDIATE RESULTS KEYS ===
['T1', 'T2', 'T3']

--- CURRENT TASK ---
current_task_id: T4
role: answer
desc: Предоставить финальный ответ пользователю о количестве неверных статей.

=== TASKS STATUS ===
[('T1', 'search', 'done'),
 ('T2', 'extract', 'done'),
 ('T3', 'calc', 'done'),
 ('T4', 'answer', 'in_progress')]

[ERROR]
Traceback (most recent call last):
  File "C:\Users\oklad\AppData\Local\Temp\ipykernel_21032\3699963541.py", line 249, in <module>
    final_state = _run_with_trace(app, init_state, verbose=VERBOSE_STREAM)
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\oklad\AppData\Local\Temp\ipykernel_21032\3699963541.py", line 141, in _run_with_trace
    for state in iterator:
  File "c:\Users\oklad\anaconda3\Lib\site-packages\langgraph\pregel\__init__.py", line 2461, in stream
    for _ in runner.ti

In [33]:
# ---------- aggregate metrics ----------
total = len(results)
ok = sum(1 for r in results if not r["error"])
exact = sum(1 for r in results if r["metrics"]["exact_match"] is True)
norm = sum(1 for r in results if r["metrics"]["normalized_match"] is True)

num_match = sum(1 for r in results if r["metrics"]["numeric_match"] is True)
num_applicable = sum(1 for r in results if r["metrics"]["numeric_match"] is not None)

summary = {
    "run_dir": OUT_DIR,
    "total": total,
    "completed_without_error": ok,
    "exact_match_acc": exact / total if total else 0.0,
    "normalized_match_acc": norm / total if total else 0.0,
    "numeric_match_acc_applicable": (num_match / num_applicable) if num_applicable else None,
    "numeric_applicable": num_applicable,
    "timestamp": RUN_NAME,
}

_write_json(SUMMARY_JSON, summary)

print("\n" + "=" * 110)
print("DONE. Summary:")
print(json.dumps(summary, ensure_ascii=False, indent=2))
print("Per-item JSON:", PER_ITEM_DIR)
print("Progress JSONL:", SUMMARY_JSONL)
print("Summary JSON:", SUMMARY_JSON)



DONE. Summary:
{
  "run_dir": "gaia_validation_runs\\gaia_run_20260109_000000",
  "total": 165,
  "completed_without_error": 134,
  "exact_match_acc": 0.04242424242424243,
  "normalized_match_acc": 0.04242424242424243,
  "numeric_match_acc_applicable": 0.15555555555555556,
  "numeric_applicable": 45,
  "timestamp": "gaia_run_20260109_000000"
}
Per-item JSON: gaia_validation_runs\gaia_run_20260109_000000\items
Progress JSONL: gaia_validation_runs\gaia_run_20260109_000000\results.jsonl
Summary JSON: gaia_validation_runs\gaia_run_20260109_000000\summary.json


In [41]:
import os
import shutil
from pathlib import Path
from typing import Dict, Any, Optional

def guess_mime_from_name(name: str) -> str:
    n = (name or "").lower()
    if n.endswith(".pdf"): return "application/pdf"
    if n.endswith(".docx"): return "application/vnd.openxmlformats-officedocument.wordprocessingml.document"
    if n.endswith(".pptx"): return "application/vnd.openxmlformats-officedocument.presentationml.presentation"
    if n.endswith(".xlsx"): return "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
    if n.endswith(".zip"): return "application/zip"
    if n.endswith(".png"): return "image/png"
    if n.endswith(".jpg") or n.endswith(".jpeg"): return "image/jpeg"
    return "application/octet-stream"

def gaia_example_to_state(
    ex: Dict[str, Any],
    data_dir: str,
    gaia_files_root: str,
    *,
    copy_attachments: bool = True,
) -> Dict[str, Any]:
    """
    GAIA row -> GraphState (под твой app.invoke()).
    Делает manifest файлов в state["files"].
    По умолчанию копирует attachments в GAIA_FILES_ROOT, чтобы file-analysis MCP мог их читать.
    """
    data_dir = str(Path(data_dir).resolve())
    gaia_files_root = Path(gaia_files_root).resolve()
    gaia_files_root.mkdir(parents=True, exist_ok=True)

    task_id = (ex.get("task_id") or "").strip() or None
    question = ex.get("Question") or ex.get("question") or ""
    rel_fp = (ex.get("file_path") or "").strip()
    file_name = (ex.get("file_name") or "").strip()

    files = []

    if rel_fp:
        src = Path(data_dir) / rel_fp
        if not src.exists():
            raise FileNotFoundError(f"GAIA attachment not found: {src}")

        # как назвать файл в sandbox
        base_name = file_name if file_name else src.name
        # чтобы не было коллизий имён
        dst_name = f"{task_id}_{base_name}" if task_id else base_name
        dst = gaia_files_root / dst_name

        if copy_attachments:
            # копируем только если нет или отличается размер
            if (not dst.exists()) or (dst.stat().st_size != src.stat().st_size):
                shutil.copy2(src, dst)
        else:
            # если решишь разрешить чтение прямо из HF cache (позже)
            dst = src

        files.append({
            "id": "f1",
            "name": dst.name,
            "path": str(dst),
            "source": "gaia_dataset",
            "mime": guess_mime_from_name(dst.name),
            "original_path": str(src),
        })

    state = {
        # твой базовый контракт
        "query": question,
        "tasks": [],
        "current_task_id": None,
        "resources": {},
        "intermediate_results": {},
        "final_answer": None,

        # GAIA additions
        "gaia_task_id": task_id,
        "files": files,
        "data_dir": data_dir,
        "ground_truth": (ex.get("Final answer") or None),  # только для validation; в test может не быть
    }
    return state

In [46]:
def run_agent_state(app, initial_state: dict) -> dict:
    return app.invoke(initial_state)

def run_agent_on_gaia(app, ex: dict, data_dir: str, gaia_files_root: str) -> str:
    state = gaia_example_to_state(ex, data_dir, gaia_files_root, copy_attachments=True)
    result_state = run_agent_state(app, state)
    return result_state["final_answer"]

In [47]:
GAIA_FILES_ROOT = r"C:\Users\oklad\Desktop\LLM_Games\workspace\gaia_files"

ex = {
 'task_id': '6359a0b1-8f7b-499b-9336-840f9ab90688',
 'Question': 'What is the area of the green polygon in the attached file? The numbers in purple represent the lengths of the side they are next to.',
 'file_name': '6359a0b1-8f7b-499b-9336-840f9ab90688.png',
 'file_path': '2023/validation/6359a0b1-8f7b-499b-9336-840f9ab90688.png',
 'Final answer': '39'
}

state = gaia_example_to_state(ex, DATA_DIR, GAIA_FILES_ROOT)
print(state["gaia_task_id"])
print(state["query"])
print(state["files"])

6359a0b1-8f7b-499b-9336-840f9ab90688
What is the area of the green polygon in the attached file? The numbers in purple represent the lengths of the side they are next to.
[{'id': 'f1', 'name': '6359a0b1-8f7b-499b-9336-840f9ab90688_6359a0b1-8f7b-499b-9336-840f9ab90688.png', 'path': 'C:\\Users\\oklad\\Desktop\\LLM_Games\\workspace\\gaia_files\\6359a0b1-8f7b-499b-9336-840f9ab90688_6359a0b1-8f7b-499b-9336-840f9ab90688.png', 'source': 'gaia_dataset', 'mime': 'image/png', 'original_path': 'C:\\Users\\oklad\\.cache\\huggingface\\hub\\datasets--gaia-benchmark--GAIA\\snapshots\\682dd723ee1e1697e00360edccf2366dc8418dd9\\2023\\validation\\6359a0b1-8f7b-499b-9336-840f9ab90688.png'}]


In [48]:
result_state = app.invoke(state)

In [50]:
result_state["final_answer"]

{'final_answer': "I cannot determine the area of the green polygon without the visual representation or specific measurements provided in the attached file. Please provide the dimensions or a description of the polygon's shape.",
 'used_facts': [],
 'ignored_or_flagged_points': [],
 'confidence': 'low',
 'follow_up_needed': True,
 'follow_up_queries': ['Can you describe the shape of the polygon or provide the lengths of all its sides?']}

In [1]:
import os, asyncio
from pathlib import Path

# --- настройки проекта ---
PROJECT_ROOT = Path(r"C:/Users/oklad/Desktop/LLM_Games").resolve()
GAIA_FILES_ROOT = PROJECT_ROOT / "workspace" / "gaia_files"
GAIA_FILES_ROOT.mkdir(parents=True, exist_ok=True)

# положи документ сюда
DOC_PATH = GAIA_FILES_ROOT / "example.pdf"

def build_env():
    env = dict(os.environ)
    env["PYTHONPATH"] = str(PROJECT_ROOT) + os.pathsep + env.get("PYTHONPATH", "")
    env["GAIA_FILES_ROOT"] = str(GAIA_FILES_ROOT)
    # env["OPENROUTER_API_KEY"] = "..."  # если нужен vision
    return env

async def main():
    # ВАЖНО: stderr в файл (иначе fileno в Jupyter на Windows)
    errlog_path = PROJECT_ROOT / "mcp_document_server.stderr.log"
    errlog = open(errlog_path, "wb")  # бинарный режим надёжнее для pipes на Windows

    # Импорты MCP клиента
    from mcp.client.stdio import stdio_client
    from mcp import ClientSession
    from mcp.client.stdio import StdioServerParameters

    server_params = StdioServerParameters(
        command="python",
        args=["-m", "automas.mcp.servers.document.server"],
        env=build_env(),
        # если в твоей версии MCP это поле есть — отлично:
        # stderr=errlog,
    )

    # Некоторые версии MCP принимают errlog прямо в stdio_client,
    # а не в StdioServerParameters. Поэтому используем try/except:
    try:
        async with stdio_client(server_params, errlog=errlog) as (read, write):
            async with ClientSession(read, write) as session:
                await session.initialize()

                tools = await session.list_tools()
                print("TOOLS:", [t.name for t in tools.tools])

                res = await session.call_tool("extract_text", {"file_path": str(DOC_PATH)})
                print("\n--- extract_text (first 2000 chars) ---\n")
                print(str(res)[:2000])

    except TypeError:
        # fallback: если stdio_client(...) не поддерживает errlog
        # и нужно передать stderr в StdioServerParameters
        server_params = StdioServerParameters(
            command="python",
            args=["-m", "automas.mcp.servers.document.server"],
            env=build_env(),
            stderr=errlog,   # <-- часто именно так
        )
        async with stdio_client(server_params) as (read, write):
            async with ClientSession(read, write) as session:
                await session.initialize()
                res = await session.call_tool("extract_text", {"file_path": str(DOC_PATH)})
                print(str(res)[:2000])

    finally:
        errlog.close()
        print(f"\nMCP stderr log: {errlog_path}")

await main()


TOOLS: ['extract_text', 'read_image', 'read_pdf', 'read_docx', 'read_pptx', 'read_xlsx_xls', 'extract_and_list_zip', 'list_zip_contents']

--- extract_text (first 2000 chars) ---

meta=None content=[TextContent(type='text', text='ДОГОВОР найма жилого помещения\n\nГ. Санкт-Петербург\n\nот « _____» __________ 2025г.\n\nМы,  гр.  РФ  Хоружина  Нина  Петровна,  паспорт  гражданина  РФ  40  15  263768,  выдан  ТП  №65  Отдела\n\nУФМС России по Санкт-Петербургу и Ленинградской обл. в Приморском р-не г. Санкт-Петербурга 09.04.2015г.,\n\nкод  подразделения780-065,  зарегистрированная  по  адресу:  гор.  Санкт-Петербург,  ул.  Мебельная,  д.21,  корп.2,\n\nкв.87, действующая через представителя Москаленко Наталью Владимировну, паспорт 40 11 527899, выданный\n\nТП  №50  Отдела  УФМС  России  по  Санкт-Петербургу  и  Ленинградской  обл.  в  Московском  р-не  гор.  Санкт-\n\nПетербурга,  дата  выдачи:  15.06.2012г.,  код  подразделения  780-050,  зарегистрированная  по  адресу:  гор.  Санкт-\n\nПе

In [22]:
import os
from pathlib import Path

# 1) чтобы ноутбук видел твой проект (важно!)
PROJECT_ROOT = Path(r"C:/Users/oklad/Desktop/LLM_Games").resolve()
os.environ["PYTHONPATH"] = str(PROJECT_ROOT) + os.pathsep + os.environ.get("PYTHONPATH","")

# 2) теперь импорт
from automas.mcp_client_file_analysis import extract_text, GAIA_FILES_ROOT

DOC = GAIA_FILES_ROOT / "example.pdf"

text = await extract_text(str(DOC))
print(text[:2000])

ДОГОВОР найма жилого помещения

Г. Санкт-Петербург

от « _____» __________ 2025г.

Мы,  гр.  РФ  Хоружина  Нина  Петровна,  паспорт  гражданина  РФ  40  15  263768,  выдан  ТП  №65  Отдела

УФМС России по Санкт-Петербургу и Ленинградской обл. в Приморском р-не г. Санкт-Петербурга 09.04.2015г.,

код  подразделения780-065,  зарегистрированная  по  адресу:  гор.  Санкт-Петербург,  ул.  Мебельная,  д.21,  корп.2,

кв.87, действующая через представителя Москаленко Наталью Владимировну, паспорт 40 11 527899, выданный

ТП  №50  Отдела  УФМС  России  по  Санкт-Петербургу  и  Ленинградской  обл.  в  Московском  р-не  гор.  Санкт-

Петербурга,  дата  выдачи:  15.06.2012г.,  код  подразделения  780-050,  зарегистрированная  по  адресу:  гор.  Санкт-

Петербург,  ул.  Орджоникидзе,  д.34,  кв.37  по  доверенности  от  08.09.2022г.,заверенной  Богомоловой  Татьяной

Владимировной,  временно  исполняющей  обязанности  нотариуса  нотариального  округа  Санкт-Петербург

Сахарова  А.А.,  зарегистрирова

In [35]:
import os, asyncio
from pathlib import Path
from typing import Dict, Any

from mcp.client.stdio import stdio_client, StdioServerParameters
from mcp import ClientSession

os.environ["OPENROUTER_API_KEY"] = "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"

# === НАСТРОЙКИ ===
PROJECT_ROOT = Path(r"C:/Users/oklad/Desktop/LLM_Games").resolve()
GAIA_FILES_ROOT = PROJECT_ROOT / "workspace" / "gaia_files"
MCP_MODULE = "automas.mcp.servers.media.server"

IMG = GAIA_FILES_ROOT / "2023/validation/6359a0b1-8f7b-499b-9336-840f9ab90688.png"


def build_env() -> Dict[str, str]:
    env = dict(os.environ)
    env["PYTHONPATH"] = str(PROJECT_ROOT) + os.pathsep + env.get("PYTHONPATH", "")
    env["GAIA_FILES_ROOT"] = str(GAIA_FILES_ROOT)
    return env


async def analyze_image_one_shot(file_path: str, prompt: str) -> str:
    errlog = open(PROJECT_ROOT / "nb_media.stderr.log", "ab")

    params = StdioServerParameters(
        command="python",
        args=["-m", MCP_MODULE],
        env=build_env(),
        cwd=str(PROJECT_ROOT),
    )

    async with stdio_client(params, errlog=errlog) as (r, w):
        async with ClientSession(r, w) as session:
            await asyncio.wait_for(session.initialize(), timeout=10)
            res = await asyncio.wait_for(
                session.call_tool(
                    "analyze_image",
                    {
                        "file_path": str(file_path),
                        "prompt": prompt,
                    },
                ),
                timeout=120,
            )

    texts = []
    for c in res.content:
        texts.append(getattr(c, "text", str(c)))
    return "\n".join(texts)


# === ЗАПУСК ===
text = await analyze_image_one_shot(
    IMG,
    prompt="Describe the image and extract all visible numbers, labels, and shapes.",
)
print(text[:2000])


{"analysis":"The image depicts a geometric figure composed of several rectangular sections, with dimensions labeled in purple. The figure is primarily green with a white central section. Here's a breakdown of the visible elements:\n\n### Numbers:\n- 10 (appears twice)\n- 6 (appears twice)\n- 4 (appears twice)\n- 8\n- 2\n- 1.5\n- 1 (appears twice)\n- 2023 (in yellow, located outside the main figure)\n\n### Labels:\n- The numbers listed above are labels indicating the dimensions of the figure's sections.\n\n### Shapes:\n- The overall shape is a complex, multi-sectioned polygon resembling an irregular L-shape.\n- The figure is composed of rectangular sections, with some sections having smaller rectangles cut out.\n- The central white section is a rectangle with dimensions labeled as 4 and 6.\n- The outer sections are green, with dimensions labeled along their edges.\n\n### Additional Notes:\n- The number \"2023\" is located outside the main figure, in the bottom right corner, and is not p

In [1]:
import requests
from pathlib import Path

DOC = Path(r"C:\Users\oklad\Desktop\LLM_Games\workspace\gaia_files\example.pdf")

r = requests.post(
    "http://127.0.0.1:8009/call/extract_text",
    json={"file_path": str(DOC)}
)
r.raise_for_status()
print(r.json()["texts"][0][:2000])

In [24]:
res = await session.call_tool(
    "read_image",
    {"file_path": str(GAIA_FILES_ROOT / "img.png"),
     "prompt": "Extract all visible text and describe any tables/figures."}
)
print(res)

NameError: name 'session' is not defined

In [27]:
import os, subprocess
from pathlib import Path

PROJECT_ROOT = Path(r"C:/Users/oklad/Desktop/LLM_Games").resolve()
GAIA_FILES_ROOT = PROJECT_ROOT / "workspace" / "gaia_files"

env = dict(os.environ)
env["PYTHONPATH"] = str(PROJECT_ROOT) + os.pathsep + env.get("PYTHONPATH", "")
env["GAIA_FILES_ROOT"] = str(GAIA_FILES_ROOT)

log = open(PROJECT_ROOT / "server_run.stderr.log", "wb")
p = subprocess.Popen(
    ["python", "-m", "automas.mcp.servers.document.server"],
    env=env,
    stdin=subprocess.PIPE,
    stdout=subprocess.PIPE,
    stderr=log,
)
print("PID:", p.pid)

PID: 20800


In [20]:
from pathlib import Path

PROJECT_ROOT = Path(r"C:\path\to\your_project").resolve()
log_path = PROJECT_ROOT / "mcp_document_server.stderr.log"

print("Log exists:", log_path.exists(), "\nPath:", log_path)
if log_path.exists():
    txt = log_path.read_text(errors="ignore")
    lines = txt.splitlines()
    print("\n--- LAST 200 LINES ---")
    print("\n".join(lines[-200:]))
else:
    print("No log file found yet.")

Log exists: True 
Path: C:\path\to\your_project\mcp_document_server.stderr.log

--- LAST 200 LINES ---
Traceback (most recent call last):
  File "<frozen runpy>", line 189, in _run_module_as_main
  File "<frozen runpy>", line 112, in _get_module_details
  File "c:\Users\oklad\Desktop\LLM_Games\automas\mcp\servers\document\__init__.py", line 4, in <module>
    from automas.mcp.servers.document.xlsx_reader import xlsx_server
  File "c:\Users\oklad\Desktop\LLM_Games\automas\mcp\servers\document\xlsx_reader.py", line 9, in <module>
    from openpyxl_image_loader import SheetImageLoader
ModuleNotFoundError: No module named 'openpyxl_image_loader'


In [21]:
import os, subprocess, textwrap
from pathlib import Path

PROJECT_ROOT = Path(r"C:\path\to\your_project").resolve()

env = dict(os.environ)
env["PYTHONPATH"] = str(PROJECT_ROOT) + os.pathsep + env.get("PYTHONPATH","")

code = textwrap.dedent("""
import sys
print("sys.path[0:3] =", sys.path[:3])
import automas
print("automas OK:", automas.__file__)
import automas.mcp.servers.document.server as s
print("server module OK:", s.__file__)
print("IMPORT_OK")
""")

p = subprocess.run(
    ["python", "-c", code],
    env=env,
    capture_output=True,
    text=True
)

print("Return code:", p.returncode)
print("\n--- STDOUT ---\n", p.stdout)
print("\n--- STDERR ---\n", p.stderr)


Return code: 1

--- STDOUT ---
 sys.path[0:3] = ['', 'C:\\path\\to\\your_project', 'c:\\Users\\oklad\\Desktop\\LLM_Games']
automas OK: None


--- STDERR ---
 Traceback (most recent call last):
  File "<string>", line 6, in <module>
  File "c:\Users\oklad\Desktop\LLM_Games\automas\mcp\servers\document\__init__.py", line 4, in <module>
    from automas.mcp.servers.document.xlsx_reader import xlsx_server
  File "c:\Users\oklad\Desktop\LLM_Games\automas\mcp\servers\document\xlsx_reader.py", line 9, in <module>
    from openpyxl_image_loader import SheetImageLoader
ModuleNotFoundError: No module named 'openpyxl_image_loader'



In [22]:
import sys, subprocess
pkgs = [
    "python-docx",    # docx -> Document
    "python-pptx",    # pptx
    "openpyxl",       # xlsx
    "pdfminer.six",   # извлечение картинок/страниц из pdf
    "markitdown",     # конвертация в markdown
    "fastmcp",        # MCP server
    "openai",
    "openpyxl_image_loader"         # vision через OpenAI SDK / OpenRouter
]
subprocess.check_call([sys.executable, "-m", "pip", "install", "-U", *pkgs])

0