In [1]:
from langchain_openai import ChatOpenAI
import json
from typing import TypedDict
from langgraph.graph import StateGraph, START, END
from trino import dbapi

In [31]:
SYSTEM_PROMPT = """
Ты аналитический модуль внутри автоматизированного пайплайна анализа данных.
Твоя задача — строго и последовательно выполнять аналитические функции,
в зависимости от этапа.

Не выходи за рамки ответственности текущего узла.

Контекст среды:
- Источник данных — SQL через Trino,
- Схема базы данных подаётся явно,
- Визуализация выполняется через matplotlib.

Правила:
- Используй только указанные таблицы/поля,
- Не придумывай данные,
- Не делай бизнес-выводов без запроса и данных,
- Возвращай только структурированный результат согласно контракту узла.
- При написании запроса к названию таблицы добавляй iceberg.gold. => iceberg.gold.table_name
""" 

In [3]:
from langchain_openai import ChatOpenAI

llm = ChatOpenAI(
    api_key="sk-pN7mbpwi3acKv4tu4iG8Uw",
    base_url="https://api.artemox.com/v1",  # или свой прокси / gateway
    model="o4-mini",
    temperature=0,
    max_tokens = 1000
)


In [4]:
class AgentState(TypedDict):
    user_input: str
    merged_input: str | None

    schema: dict | None 

    intent: str | None  # например: "analysis_request"
    clarification_required: bool | None
    questions: list[str] | None

    sql_query: str | None
    query_result: object | None
    analytics: str | None
    viz_code: str | None


In [5]:
def schema_to_text(schema: dict | None) -> str:
    if not schema:
        return "Схема данных недоступна."

    lines = []
    for table, columns in schema.get("tables", {}).items():
        lines.append(f"Таблица {table}:")
        for col, meta in columns.items():
            col_type = meta.get("type")
            comment = meta.get("comment")
            if comment:
                lines.append(f"- {col} ({col_type}): {comment}")
            else:
                lines.append(f"- {col} ({col_type})")
        lines.append("")  # пустая строка между таблицами

    return "\n".join(lines)


In [24]:
import json

from trino import dbapi

def schema_introspection_node(state: AgentState) -> dict:
    print(">>> ENTERED schema_introspection_node")

    conn = dbapi.connect(
        host="localhost",
        port=8081,
        user="trino_user",
        catalog="iceberg",
        schema="gold",
    )

    cursor = conn.cursor()

    # 1️⃣ список таблиц
    cursor.execute("SHOW TABLES FROM iceberg.gold")
    tables = [row[0] for row in cursor.fetchall()]

    schema = {
        "layer": "gold",
        "catalog": "iceberg",
        "tables": {}
    }

    # 2️⃣ DESCRIBE + comments
    for table in tables:
        cursor.execute(f"DESCRIBE iceberg.gold.{table}")
        rows = cursor.fetchall()

        columns = {}

        for row in rows:
            column_name = row[0]
            data_type = row[1]
            comment = row[3] if len(row) > 3 else None

            # отсекаем служебные строки
            if not column_name or column_name.startswith("#"):
                continue

            columns[column_name] = {
                "type": data_type,
                "comment": comment
            }

        schema["tables"][table] = columns

    cursor.close()
    conn.close()

    print(">>> SCHEMA WITH COMMENTS LOADED")
    print(schema)

    return {
        "schema": schema
    }


#Узел получения запроса и данных
def intent_node(state: AgentState) -> dict:
    text = state.get("merged_input") or state["user_input"]

    schema = state.get("schema")

    schema_text = schema_to_text(schema)

    print(schema_text)

    prompt = f"""
STAGE: INTENT
Доступные данные:
{schema_text}

USER_QUERY:
{text}

Проверь, хватает ли данных для генерации SQL.

Минимально требуется:
- аналитическая цель
- метрика
- схема БД

Верни JSON строго в формате:
{{
  "clarification_required": boolean,
  "questions": [string]
}}
"""

    response = llm.invoke([
        {"role": "system", "content": SYSTEM_PROMPT},
        {"role": "user", "content": prompt},
    ])

    parsed = json.loads(response.content)

    print("\n=== INTENT LLM ===")
    print(response.content)

    return {
        "clarification_required": parsed["clarification_required"],
        "questions": parsed.get("questions"),
    }

#Узел маршрута, по условию хватает ли данных для запроса
def route_after_intent(state: AgentState) -> str:
    """
    Маршрутизация (НЕ LLM):
    - если нужны уточнения -> clarification_node
    - иначе -> sql_exec_node
    """
    if state.get("clarification_required"):
        return "clarification_node"
    return "sql_exec_node"

#Цикл повтора
def clarification_node(state: AgentState) -> dict:
    """
    Одна нода для уточнения:
    - показывает вопросы
    - принимает один ввод пользователя
    - добавляет его к merged_input
    """
    print("\n❓ Не хватает данных. Уточни, пожалуйста:")

    for q in (state.get("questions") or []):
        print(f"- {q}")

    clarification = input("\nВведите уточнение одним сообщением:\n> ").strip()

    base = state.get("merged_input") or state["user_input"]
    merged = f"{base}\n\nУТОЧНЕНИЕ:\n{clarification}"

    return {
        "merged_input": merged
    }

# def sql_planning_node(state: AgentState) -> dict:
#     """
#     Пока заглушка.
#     Тут ты дальше сделаешь генерацию SQL под Trino на основе merged_input + схемы.
#     """
#     final_context = state.get("merged_input") or state["user_input"]

#     print("\n✅ Данных достаточно. Переходим к SQL_PLANNING.")
#     print("\n=== FINAL CONTEXT ===")
#     print(final_context)

#     # заглушка
#     return {}

#Генерация sql
def sql_exec_node(state: AgentState) -> dict:
    """
    Генерирует SQL-запрос (Trino SQL) на основе финального контекста.
    """
    final_context = state.get("merged_input") or state["user_input"]
    schema = state.get("schema")

    schema_text = schema_to_text(schema)
    print("\n✅ Данных достаточно. Переходим к SQL_PLANNING.")
    print("\n=== FINAL CONTEXT ===")
    print(final_context)

    final_text = state.get("merged_input") or state["user_input"]

    prompt = f"""
STAGE: SQL_GENERATION

Доступные данные:
{schema_text}

USER_REQUEST:
{final_text}

Требования:
- Сгенерируй SQL-запрос для Trino
- Используй только те таблицы и поля, которые указаны пользователем
- Если схема БД описана текстом — интерпретируй её буквально
- Если период указан текстом (например, "весь 2021 год") — корректно преобразуй в фильтр по датам
- Если метрика указана — используй её
- Если группировка указана — добавь GROUP BY
- Если чего-то не хватает, сделай разумное допущение, но НЕ задавай вопросов

Верни результат СТРОГО в JSON формате:
{{
  "sql_query": "string"
}}
"""

    response = llm.invoke([
        {"role": "system", "content": SYSTEM_PROMPT},
        {"role": "user", "content": prompt},
    ])

    print("\n=== SQL LLM ===")
    print(response.content)

    parsed = json.loads(response.content)

    return {
        "sql_query": parsed["sql_query"]
    }




In [25]:
graph = StateGraph(AgentState)

graph.add_node("schema_introspection_node", schema_introspection_node)
graph.add_node("intent_node", intent_node)
graph.add_node("clarification_node", clarification_node)
#graph.add_node("sql_planning_node", sql_planning_node)
graph.add_node("sql_exec_node", sql_exec_node)


# строим последовательность выполнения
graph.add_edge(START, "schema_introspection_node")
graph.add_edge("schema_introspection_node","intent_node")
graph.add_conditional_edges(
    "intent_node",
    route_after_intent,
    {
        "clarification_node": "clarification_node",
        "sql_exec_node": "sql_exec_node",
    }
)
# цикл уточнений
graph.add_edge("clarification_node", "intent_node")
graph.add_edge("sql_exec_node", END)



compiled = graph.compile()

In [32]:
if __name__ == "__main__":
    user_query = input("Введите запрос:")

    initial_state: AgentState = {
        "user_input": user_query,
        "merged_input": None,

        "clarification_required": None,
        "questions": None,

        "sql_query": None,

        "query_result": None,
        "analytics": None,
        "viz_code": None,
        "schema": None
    }

    print("USER INPUT:", initial_state["user_input"])

    result_state = compiled.invoke(initial_state)
    
    #print("PARSED INTENT:", result_state["intent"])

    print("\n=== FINAL STATE ===")
    for k, v in result_state.items():
        print(f"{k}: {v}")

USER INPUT: В какой стране больше всего игроков?
>>> ENTERED schema_introspection_node
>>> SCHEMA WITH COMMENTS LOADED
{'layer': 'gold', 'catalog': 'iceberg', 'tables': {'dm_genre_performance': {'genre': {'type': 'varchar', 'comment': 'Жанр видеоигр'}, 'games_cnt': {'type': 'bigint', 'comment': 'Количество игр в данном жанре'}, 'avg_price': {'type': 'double', 'comment': 'Средняя стоимость игр в жанре'}, 'players_total_cnt': {'type': 'bigint', 'comment': 'Общее количество игроков, оставивших отзывы по жанру'}, 'recommendation_rate': {'type': 'double', 'comment': 'Средняя доля положительных рекомендаций по жанру'}, 'avg_hours_played': {'type': 'double', 'comment': 'Среднее количество часов, проведённых в играх жанра'}}, 'dm_market_games_overview': {'gameid': {'type': 'varchar', 'comment': 'Уникальный идентификатор игры в Steam'}, 'game_name': {'type': 'varchar', 'comment': 'Наименование игры'}, 'release_date': {'type': 'date', 'comment': 'Дата релиза игры'}, 'price_usd': {'type': 'double

In [None]:
print(result_["schema"])

NameError: name 'result_state' is not defined

In [33]:
print(result_state["sql_query"])

SELECT country, COUNT(*) AS players_cnt
FROM iceberg.gold.dm_player_engagement
GROUP BY country
ORDER BY players_cnt DESC
LIMIT 1;
