In [None]:
from __future__ import annotations

import ast
import json
import os
import re
from dataclasses import dataclass
from typing import Any, Dict, List, Optional

from neo4j import GraphDatabase
from langchain_core.tools import tool
from langchain_openai import ChatOpenAI
from langchain_core.messages import AnyMessage, HumanMessage, SystemMessage, AIMessage
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from typing_extensions import Annotated, TypedDict

os.environ["LANGSMITH_API_KEY"] = os.environ.get("LANGSMITH_API_KEY", "")
os.environ["LANGSMITH_TRACING"] = os.environ.get("LANGSMITH_TRACING", "true")


SYSTEM_PROMPT = """
Ти — агент штучного інтелекту з підтримкою виклику функцій для роботи з Neo4j-графом.
Тобі надаються підписи доступних інструментів у структурованому форматі.
Ти можеш викликати **лише один інструмент** — `search_graph_db`, щоб отримати дані з графа.
Не роби припущень і не вигадуй дані поза результатами виклику інструменту.

Опис графа:
Граф містить такі типи вузлів:
- Person (поля: first_name, full_name, last_name)

Ось доступний інструмент:
[
  {
    "type": "function",
    "function": {
      "name": "search_graph_db",
      "description": "Виконує Cypher-запит до Neo4j-графа",
      "parameters": {
        "type": "object",
        "properties": {
          "query": {
            "type": "string",
            "description": "Cypher-запит для виконання"
          }
        },
        "required": ["query"]
      }
    }
  }
]

Використовуй наступну JSON schema (pydantic model) для кожного виклику інструменту:
{
  "title": "FunctionCall",
  "type": "object",
  "properties": {
    "name": {
      "title": "Name",
      "type": "string"
    },
    "arguments": {
      "title": "Arguments",
      "type": "object"
    }
  },
  "required": ["name", "arguments"]
}

Перед кожним викликом інструменту:
- Проаналізуй запит користувача
- Визнач, яку інформацію потрібно отримати з графа
- Сформуй коректний Cypher-запит

Твої міркування перед викликом інструменту поміщай між тегами:
<think>...</think>

Кожен виклик інструменту повертай **виключно** у вигляді JSON-обʼєкта з імʼям функції та аргументами,
обгорнутого в XML-теги таким чином:

<tool_call>
{JSON з name та arguments}
</tool_call>

Правила використання `search_graph_db`:
- Обовʼязково використовуй інструмент, якщо користувач просить:
  факти з графа, списки, підрахунки, звʼязки, пошук сутностей
- Якщо є сумнів — виконай запит до графа, а не вгадуй

Правила написання Cypher-запитів:
- Використовуй лише READ-ONLY запити: MATCH ... RETURN ...
- Завжди додавай LIMIT (починай з 5–20)
- Повертай лише необхідні поля, а не цілі вузли без потреби
- Використовуй явні аліаси: RETURN p.name AS name
- Для агрегацій використовуй count(*), collect(...), DISTINCT, ORDER BY
- Якщо запит неоднозначний — постав ОДНЕ коротке уточнююче питання
  або виконай простий дослідницький запит

Після отримання результату:
- Формуй відповідь ТІЛЬКИ на основі JSON, який повернув інструмент
- Пояснюй відповідь лаконічно, вказуючи, з яких вузлів і полів вона отримана
- Якщо результат порожній — скажи, що нічого не знайдено,
  і запропонуй наступний запит або уточнення
- Ніколи не стверджуй, що виконав запит, якщо інструмент не був викликаний

Простий приклад:

Запит користувача:
"Знайди 5 людей та покажи їх імена"

Ти маєш викликати інструмент з таким Cypher-запитом:
MATCH (p:Person) RETURN p.name AS name LIMIT 5

Після цього прочитати JSON-відповідь та відповісти:
"Я знайшов 5 вузлів типу Person. Їхні імена: ..."

Нагадування: доступний лише один інструмент — `search_graph_db`.
"""


@dataclass(frozen=True)
class AgentConfig:
    model_name: str
    base_url: str
    api_key: str
    temperature: float = 0.0

    neo4j_uri: str = "bolt://localhost:7687"
    neo4j_user: str = "neo4j"
    neo4j_password: str = "password"
    neo4j_database: Optional[str] = None

    allow_write_queries: bool = False


def make_search_graph_db_tool(cfg: AgentConfig):
    @tool("search_graph_db")
    def search_graph_db(query: str) -> str:
        """
        Query the Neo4j graph database using Cypher queries.
        Args:
            query: A Cypher query string to execute against the Neo4j database.
        Returns:
            JSON string with query results (list of records as dicts).
        """
        q = (query or "").strip()
        if not q:
            return json.dumps({"error": "Empty query"}, ensure_ascii=False)

        if not cfg.allow_write_queries:
            blocked = ("CREATE", "MERGE", "DELETE", "SET", "DROP", "CALL", "LOAD CSV", "APOC")
            upper = q.upper()
            if any(tok in upper for tok in blocked):
                return json.dumps(
                    {
                        "error": "Write/procedure queries are disabled for this agent.",
                        "hint": "Use MATCH/RETURN (read-only) queries.",
                    },
                    ensure_ascii=False,
                )

        try:
            auth = (cfg.neo4j_user, cfg.neo4j_password)
            with GraphDatabase.driver(cfg.neo4j_uri, auth=auth) as driver:
                kwargs = {}
                if cfg.neo4j_database:
                    kwargs["database"] = cfg.neo4j_database

                with driver.session(**kwargs) as session:
                    result = session.run(q)
                    records: List[Dict[str, Any]] = [r.data() for r in result]
                    return json.dumps(records, ensure_ascii=False)
        except Exception as exc:
            return json.dumps({"error": f"Neo4j query failed: {exc}"}, ensure_ascii=False)

    return search_graph_db


TOOL_CALL_RE = re.compile(r"(?s)<tool_call>\s*(\{.*?\})\s*</tool_call>")

def extract_tool_call_from_text(text: str) -> Optional[Dict[str, Any]]:
    """
    Extracts HF-style tool call embedded in assistant text:
    <tool_call>
    {'name': 'search_graph_db', 'arguments': {'query': '...'}}
    </tool_call>
    """
    if not text:
        return None
    m = TOOL_CALL_RE.search(text)
    if not m:
        return None

    payload_raw = m.group(1).strip()

    try:
        payload = ast.literal_eval(payload_raw)
    except Exception:
        try:
            payload = json.loads(payload_raw)
        except Exception:
            return None

    if not isinstance(payload, dict):
        return None
    if payload.get("name") != "search_graph_db":
        return None

    args = payload.get("arguments")
    if not isinstance(args, dict):
        return None

    q = args.get("query")
    if not isinstance(q, str) or not q.strip():
        return None

    return payload


class AgentState(TypedDict):
    messages: Annotated[List[AnyMessage], add_messages]
    pending_tool: Optional[Dict[str, Any]]


class Neo4jLangGraphAgent:
    def __init__(self, cfg: AgentConfig, system_prompt: Optional[str] = None):
        self.cfg = cfg
        self.system_prompt = system_prompt or SYSTEM_PROMPT

        self.tool = make_search_graph_db_tool(cfg)

        self.llm = ChatOpenAI(
            model=cfg.model_name,
            base_url=cfg.base_url,
            api_key=cfg.api_key,
            temperature=cfg.temperature,
        ).bind_tools([self.tool])

        self.graph = self._build_graph()

    def _build_graph(self):
        def assistant_node(state: AgentState) -> Dict[str, Any]:
            resp = self.llm.invoke(state["messages"])
            return {"messages": [resp]}

        def parse_tool_call_node(state: AgentState) -> Dict[str, Any]:
            last = state["messages"][-1]
            content = getattr(last, "content", "") or ""
            tc = extract_tool_call_from_text(content)
            return {"pending_tool": tc}

        def run_tool_node(state: AgentState) -> Dict[str, Any]:
            tc = state.get("pending_tool")
            if not tc:
                return {}

            query = tc["arguments"]["query"]
            tool_out = self.tool.invoke({"query": query})  # returns JSON string

            tool_msg = AIMessage(
                content=(
                    "TOOL_RESULT(search_graph_db) JSON:\n"
                    f"{tool_out}\n\n"
                    "Now answer the user based ONLY on this JSON."
                )
            )
            return {"messages": [tool_msg], "pending_tool": None}

        def route_after_parse(state: AgentState) -> str:
            return "run_tool" if state.get("pending_tool") else END

        g = StateGraph(AgentState)
        g.add_node("assistant", assistant_node)
        g.add_node("parse_tool_call", parse_tool_call_node)
        g.add_node("run_tool", run_tool_node)

        g.add_edge(START, "assistant")
        g.add_edge("assistant", "parse_tool_call")
        g.add_conditional_edges("parse_tool_call", route_after_parse, {"run_tool": "run_tool", END: END})
        g.add_edge("run_tool", "assistant")  # loop back; assistant may decide to call tool again

        return g.compile()

    def invoke(self, user_text: str) -> str:
        init_messages: List[AnyMessage] = [
            SystemMessage(content=self.system_prompt),
            HumanMessage(content=user_text),
        ]
        out = self.graph.invoke({"messages": init_messages, "pending_tool": None})
        final_msg = out["messages"][-1]
        return getattr(final_msg, "content", str(final_msg))

In [None]:
cfg = AgentConfig(
    model_name="lapa-function-calling",              
    base_url="",   
    api_key="",
    neo4j_uri="",
    neo4j_user="neo4j",
    neo4j_password="",
    neo4j_database="neo4j",                     
)

agent = Neo4jLangGraphAgent(cfg)
print(agent.graph.get_graph().draw_ascii())

                +-----------+               
                | __start__ |               
                +-----------+               
                       *                    
                       *                    
                       *                    
                +-----------+               
                | assistant |               
                +-----------+               
                **           **             
              **               **           
            **                   **         
+-----------------+                **       
| parse_tool_call |                 *       
+-----------------+..               *       
          .          .....          *       
          .               ....      *       
          .                   ...   *       
    +---------+               +----------+  
    | __end__ |               | run_tool |  
    +---------+               +----------+  


In [None]:
answer = agent.invoke(
    "Виведи всіх людей та їх прізвища"
)

print(answer)