In [70]:
import logging
import os
from typing import Literal

import psycopg2
from dotenv import load_dotenv
from openai import Client
from psycopg2.extensions import connection
from psycopg2.extras import RealDictCursor
from pydantic import BaseModel, Field


In [72]:
load_dotenv()

logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", datefmt="%Y-%m-%d %H:%M:%S")
logger = logging.getLogger(__name__)

llm_api_url = os.getenv("AGENT_LLM_API_URL")
api_key = os.getenv("AGENT_LLM_API_TOKEN")
model = os.getenv("AGENT_LLM_API_MODEL")


In [73]:
client = Client(base_url=llm_api_url, api_key=api_key)

db_params = {
    "host": os.environ.get("POSTGRES_HOST", "127.0.0.1"),
    "port": os.environ.get("POSTGRES_PORT", "5432"),
    "database": os.environ.get("POSTGRES_DB", "your_database"),
    "user": os.environ.get("POSTGRES_USER", "your_username"),
    "password": os.environ.get("POSTGRES_PASSWORD", "your_password"),
}


def get_available_tables(schema: str = "public") -> list[str]:
    conn = psycopg2.connect(**db_params)
    cursor = conn.cursor()
    cursor.execute(
        """
        SELECT table_name
        FROM information_schema.tables
        WHERE table_schema = %s AND table_type = 'BASE TABLE';
        """,
        (schema,),
    )
    tables = [row[0] for row in cursor.fetchall()]
    conn.close()
    logger.debug(f"Available tables: {tables}")
    return tables


def get_table_schema(table_name: str) -> str:
    conn = psycopg2.connect(**db_params)
    cursor = conn.cursor()
    cursor.execute(
        """
        SELECT column_name, data_type
        FROM information_schema.columns
        WHERE table_name = %s;
        """,
        (table_name,),
    )
    columns = cursor.fetchall()
    conn.close()
    schema = "\n".join(f"- {col[0]} ({col[1]})" for col in columns)
    logger.debug(f"Schema for {table_name}:\n{schema}")
    return schema


def get_table_preview(table_name: str, limit: int = 10) -> list[dict]:
    conn = psycopg2.connect(**db_params)
    cursor = conn.cursor(cursor_factory=RealDictCursor)
    cursor.execute(f"SELECT * FROM {table_name} LIMIT {limit}")  # noqa: S608
    rows = cursor.fetchall()
    conn.close()
    logger.debug(f"Preview first {limit} rows from {table_name}: {rows}")
    return rows


def sql_engine(query: str) -> str:
    """Execute validated SQL SELECT queries on the 'resumes' table and returns results as a JSON string."""
    logger.info(f"Executing SQL: {query}")
    try:
        with psycopg2.connect(**db_params) as conn, conn.cursor(cursor_factory=RealDictCursor) as cursor:
            cursor.execute(query)
            try:
                results = cursor.fetchall()
            except psycopg2.ProgrammingError:
                logger.info("Query executed successfully, but no results to fetch.")
                return []
            else:
                logger.debug(f"SQL results: {results}")
                return results
    except psycopg2.errors.SyntaxError:
        logger.exception("Syntax error in SQL query")
    except psycopg2.Error:
        logger.exception("Database error")
    return []


class SQLRequest(BaseModel):
    reasoning: str = Field(..., description="Почему запрос безопасен или опасен")
    is_dangerous: bool = Field(..., description="Флаг опасности")
    sql_query: str | None = Field(None, description="SQL для выполнения, если safe")


class AgentAction(BaseModel):
    function: Literal["sql_engine"]
    reasoning: str = Field(..., description="Напиши свои мысли, как ты формируешь sql запрос")
    is_dangerous: bool
    sql_query: str | None


def analyze_user_message(message: str) -> AgentAction:
    logger.info(f"Analyzing user message: {message}")
    tables = get_available_tables()
    schema_blocks = [f"Table `{tbl}`:\n{get_table_schema(tbl)}" for tbl in tables]
    full_schema = "\n\n".join(schema_blocks)

    system_prompt = (
        "Ты — AI-ассистент, генерирующий SQL-запросы на основе пользовательских запросов.\n"
        "Ниже — схема базы данных PostgreSQL:\n\n"
        f"{full_schema}\n\n"
        "1) Проанализируй запрос.\n"
        "2) Если он опасен или модифицирует данные — установи is_dangerous=true и опиши reasoning.\n"
        "3) Если безопасен — сгенерируй корректный SELECT и верни его в поле sql_query.\n"
        "4) Перефразируй запрос пользователя как комментарий перед SQL.\n"
        "5) Учитывай различные варианты написания специальностей.\n"
        "6) Оптимизируй запрос для минимальной нагрузки на БД."
    )

    response = client.beta.chat.completions.parse(
        model=model,
        temperature=0.5,
        messages=[{"role": "system", "content": system_prompt}, {"role": "user", "content": message}],
        response_format=AgentAction,
    )

    raw = response.choices[0].message.content
    logger.debug(f"Raw LLM response: {raw}")

    action: AgentAction = response.choices[0].message.parsed
    logger.info(f"LLM reasoning: {action.reasoning}")
    logger.debug(f"Parsed AgentAction: {action.model_dump_json()}")
    if action.sql_query:
        logger.info(f"Generated SQL query: {action.sql_query}")
    return action

In [74]:
def format_sql_result_with_llm(user_message: str, sql_query: str, raw_result: str) -> str:
    """Форматирует результат SQL-запроса в человекочитаемый ответ с помощью LLM."""
    system_prompt = (
        "Ты — умный помощник, который объясняет результат SQL-запроса пользователю простыми словами.\n"
        "Вот что нужно сделать:\n"
        "1) Прочитай исходный пользовательский запрос.\n"
        "2) Посмотри, какой SQL был сгенерирован.\n"
        "3) Посмотри на результат (JSON).\n"
        "4) Построй понятный, краткий и точный ответ для пользователя на русском языке.\n"
        "Не добавляй лишней информации, пиши по существу.\n"
    )

    messages = [
        {"role": "system", "content": system_prompt},
        {"role": "user", "content": f"Запрос пользователя:\n{user_message}"},
        {"role": "user", "content": f"Сгенерированный SQL:\n{sql_query}"},
        {"role": "user", "content": f"Результат SQL (JSON):\n{raw_result}"},
    ]

    class FinalAnswer(BaseModel):
        answer: int


    response = client.beta.chat.completions.parse(
        model=model,
        temperature=0.15,
        messages=messages,
        # response_format=FinalAnswer
    )

    reply = response.choices[0].message.content.strip()
    logger.debug(f"LLM-formatted answer: {reply}")
    return reply


In [75]:
def process_user_message(message: str) -> str:
    logger.info(f"User message received: {message}")
    action = analyze_user_message(message)

    if action.is_dangerous:
        logger.warning(f"Dangerous request rejected: {action.reasoning}")
        return f"Запрос отклонён: {action.reasoning}"
    if action.function == "sql_engine" and action.sql_query:
        try:
            raw_results = sql_engine(action.sql_query)
            logger.info(f"Query executed successfully, returned {len(raw_results)} rows")
        except Exception:
            raw_results = "Ошибка при выполнении SQL"
            logger.exception("SQL execution error")
        return format_sql_result_with_llm(
            user_message=message,
            sql_query=action.sql_query,
            raw_result=raw_results,
        )
    logger.error("No valid SQL query generated")
    return "Ваш запрос не имеет отношения к базе данных."


In [76]:
query = "кто шарит за аналитику?"

answer = process_user_message(query)

2025-05-06 02:22:26 [INFO] User message received: кто шарит за аналитику?
2025-05-06 02:22:26 [INFO] Analyzing user message: кто шарит за аналитику?
2025-05-06 02:22:31 [INFO] HTTP Request: POST https://llm-api.cibaa.raiffeisen.ru/chat/completions "HTTP/1.1 200 OK"
2025-05-06 02:22:31 [INFO] LLM reasoning: Запрос пользователя не является опасным и не модифицирует данные. Он ищет резюме, где указаны навыки или опыт, связанные с аналитикой.
2025-05-06 02:22:31 [INFO] Generated SQL query: -- Кто имеет навыки или опыт, связанные с аналитикой?
SELECT id, name, title, summary, skills, experience
FROM resumes
WHERE skills @> ARRAY['аналитик', 'аналитика', 'data analyst', 'data analysis']::text[]
OR experience::text ILIKE ANY (ARRAY['%аналитик%', '%аналитика%', '%data analyst%', '%data analysis%'])
LIMIT 100;
2025-05-06 02:22:31 [INFO] Executing SQL: -- Кто имеет навыки или опыт, связанные с аналитикой?
SELECT id, name, title, summary, skills, experience
FROM resumes
WHERE skills @> ARRAY['ана

Найдены кандидаты с навыками и опытом, связанными с аналитикой:

1. **Николай Еремеевич Наумов** — Data Analyst с 8-летним стажем. Опыт в анализе данных, визуализации и разработке аналитических моделей.
2. **Тетерина София Богдановна** — Data Scientist с 14-летним стажем. Специализируется на машинном обучении и обработке данных.
3. **Зиновий Фёдорович Рогов** — Chief Data Officer (CDO) с 9-летним стажем. Опыт в управлении данными и разработке стратегий.
4. **Щербакова Ольга Натановна** — Head of Data с 13-летним стажем. Специализируется на управлении данными и разработке аналитических решений.
5. **Морозова Фаина Геннадиевна** — Quantitative Analyst с 15-летним стажем. Опыт в разработке алгоритмических моделей для финансовых рынков.
6. **Вячеслав Феликсович Карпов** — Data Steward с 10-летним стажем. Опыт в управлении данными и обеспечении их качества.
7. **Данилов Адам Ярославович** — Head of Data с 6-летним стажем. Специализируется на управлении данными и разработке стратегий.
8. **Ш

# ООП

In [5]:
class DatabaseClient:
    def __init__(self) -> None:
        self.db_params = {
            "host": os.environ.get("POSTGRES_HOST", "127.0.0.1"),
            "port": os.environ.get("POSTGRES_PORT", "5432"),
            "database": os.environ.get("POSTGRES_DB", "your_database"),
            "user": os.environ.get("POSTGRES_USER", "your_username"),
            "password": os.environ.get("POSTGRES_PASSWORD", "your_password"),
        }
    def connect(self) -> connection:
        return psycopg2.connect(**self.db_params)

    def get_tables(self, schema: str = "public") -> list[str]:
        with self.connect() as conn, conn.cursor() as cur:
            cur.execute(
                """
                    SELECT table_name
                    FROM information_schema.tables
                    WHERE table_schema = %s
                      AND table_type = 'BASE TABLE';
                    """,
                (schema,),
            )
            tables = [row[0] for row in cur.fetchall()]
            logger.debug(f"Available tables: {tables}")
            return tables

    def get_schema(self, table_name: str) -> str:
        with self.connect() as conn, conn.cursor() as cur:
            cur.execute(
                """
                    SELECT column_name, data_type
                    FROM information_schema.columns
                    WHERE table_name = %s;
                    """,
                (table_name,),
            )
            cols = cur.fetchall()
            schema = "\n".join(f"- {c[0]} ({c[1]})" for c in cols)
            logger.debug(f"Schema for {table_name}:\n{schema}")
            return schema

    def execute(self, query: str) -> list[dict]:
        logger.info(f"Executing SQL: {query}")
        with self.connect() as conn, conn.cursor(cursor_factory=RealDictCursor) as cur:
            cur.execute(query)
            results = cur.fetchall()
            logger.debug(f"SQL results: {results}")
            return results


class LLMClient:
    def __init__(self, api_url: str, api_key: str, model: str) -> None:
        self.client = Client(base_url=api_url, api_key=api_key)
        self.model = model

    def analyze(self, prompt: str, user_message: str) -> AgentAction:
        resp = self.client.beta.chat.completions.parse(
            model=self.model,
            messages=[{"role": "system", "content": prompt}, {"role": "user", "content": user_message}],
            response_format=AgentAction,
        )
        raw = resp.choices[0].message.content
        logger.debug(f"Raw LLM response: {raw}")
        action = resp.choices[0].message.parsed
        logger.info(f"LLM reasoning: {action.reasoning}")
        logger.debug(f"Parsed AgentAction: {action.model_dump_json()}")
        return action


class SQLAgent:
    def __init__(self, api_url: str, api_key: str, model: str) -> None:
        self.db = DatabaseClient()
        self.llm = LLMClient(api_url, api_key, model)

    def build_prompt(self) -> str:
        tables = self.db.get_tables()
        schema = "\n\n".join(f"Table `{t}`:\n{self.db.get_schema(t)}" for t in tables)
        return (
            "Ты — AI-ассистент, генерирующий оптимизированные SQL-запросы SELECT на основе пользовательских запросов.\n"
            "Ниже — схема базы данных PostgreSQL:\n\n"
            f"{schema}\n\n"
            "Правила распознавания ролей и специальностей:\n"
            "- Вычленяй любые профессии/роли из запроса (devops, frontend, backend и др.) и нормализуй их.\n"
            "- Учитывай вариации написания: регистр, пробелы, дефисы, транслитерацию.\n\n"
            "Правила генерации ответа:\n"
            "1) Безопасность: если опасно — is_dangerous=true и объяснение.\n"
            "2) Если safe — сгенерируй оптимальный SELECT с учётом индексов.\n"
            "3) Перефразируй запрос как комментарий перед SQL.\n"
            "4) Минимизируй нагрузку: только необходимые поля, без лишних JOIN.\n"
        )

    def handle(self, user_message: str) -> None | str:
        prompt = self.build_prompt()
        action = self.llm.analyze(prompt, user_message)
        if action.is_dangerous:
            return f"Запрос отклонён: {action.reasoning}"
        if action.sql_query:
            return self.db.execute(action.sql_query)
        return "Не удалось сформировать SQL-запрос."


In [6]:
agent = SQLAgent(llm_api_url, api_key, model)
query = "сколько умеет в питон?"
agent.handle(query)

2025-05-06 01:44:32 [INFO] HTTP Request: POST https://llm-api.cibaa.raiffeisen.ru/chat/completions "HTTP/1.1 200 OK"
2025-05-06 01:44:32 [INFO] LLM reasoning: Пользователь хочет узнать, сколько резюме содержат навык 'Python'.
2025-05-06 01:44:32 [INFO] Executing SQL: -- Сколько резюме содержат навык 'Python'
SELECT COUNT(*)
FROM resumes
WHERE 'Python' = ANY (skills);


[RealDictRow([('count', 111)])]