In [2]:
import os
import json
from pathlib import Path

from py_pg_explain_analyzer.db import PgConnection
from py_pg_explain_analyzer.analyzer import PgExplainAnalyzer
from py_pg_explain_analyzer.models import AnalysisRequest
from py_pg_explain_analyzer.sql_parser import SqlAnalyzer
from py_pg_explain_analyzer.sql_metadata_extractor import TableInfoExtractor
from openai import OpenAI


In [19]:
assert False, "Заполни DSN OPENAI_API_KEY"
DSN = "postgresql://user:pass@localhost:5432/dbname"  # строка подключения к Postgres
OPENAI_API_KEY = "sk-...."

AssertionError: Заполни DSN OPENAI_API_KEY

In [None]:
# DSN = "postgresql://user:pass@localhost:5432/dbname"  # строка подключения к Postgres
# OPENAI_API_KEY = "sk-...."

In [None]:
# DSN = "postgresql://user:pass@localhost:5432/dbname"  # строка подключения к Postgres
# OPENAI_API_KEY = "sk-...."


# Флаги EXPLAIN (безопасные значения по умолчанию)
ANALYZE = False    # True — выполнит запрос (на свой риск)
VERBOSE = True
BUFFERS = False # работает вместе с ANALYZE
TIMING  = False # работает вместе с ANALYZE
SETTINGS = True
COSTS = True

COLLECT_METADATA = True          # собирать типы/индексы/pg_stats/UNINDEXED
OUT_DIR = "out"                  # папка для артефактов

# Если хочешь брать DSN из окружения — раскомментируй:
# DSN = os.getenv("PG_DSN", DSN)

# ============================================================================



In [5]:
SQL = """
with found_bad_ads as (select pa.*
                       from ads.ads as aa
                                join parser.ads as pa on pa.item_id = aa.item_id and pa.item_change_at >
                                                                                     aa.item_change_at +
                                                                                     INTERVAL '3 hours 10 minutes'
                                                                                     order by pa.wc_count desc
                                                                                     )

select date(item_change_at), count(*)
from found_bad_ads
group by date(item_change_at)
--order by wc_count desc
;

"""
SQL = SQL.strip()

In [6]:
# 1) Статический разбор SQL: таблицы/колонки + нормализация + линт
s = SqlAnalyzer(SQL)
tables_and_cols = s.get_tables_and_columns()
normalized_sql = s.normalize()
lints = s.lint()

In [7]:
%%time
# 2) (опц.) Метаданные по таблицам и UNINDEXED
metadata = {}
if COLLECT_METADATA:
    try:
        tie = TableInfoExtractor(DSN)
        metadata = tie.analyze(tables_and_cols)
    except Exception as e:
        print(f"[metadata] пропущено из-за ошибки: {e}")
# metadata

CPU times: total: 15.6 ms
Wall time: 104 ms


In [None]:
# 3) EXPLAIN FORMAT JSON + эвристики (БЕЗ LLM) → промежуточный результат
db = PgConnection(DSN)
plan_analyzer = PgExplainAnalyzer(db)  # конструктор без llm
req = AnalysisRequest(
    sql=SQL,
    analyze=ANALYZE,
    verbose=VERBOSE,
    buffers=BUFFERS,
    timing=TIMING,
    settings = SETTINGS,
    costs = COSTS,
)
res = plan_analyzer.analyze_one(req)
res

AnalysisResult(sql="with found_bad_ads as (select pa.*\n                       from ads.ads as aa\n                                join parser.ads as pa on pa.item_id = aa.item_id and pa.item_change_at >\n                                                                                     aa.item_change_at +\n                                                                                     INTERVAL '3 hours 10 minutes'\n                                                                                     order by pa.wc_count desc\n                                                                                     )\n\nselect date(item_change_at), count(*)\nfrom found_bad_ads\ngroup by date(item_change_at)\n--order by wc_count desc\n;", plan=Plan(root=PlanNode(node_type='Aggregate', relation_name=None, alias=None, actual_rows=None, plan_rows=5957.0, actual_total_time_ms=None, index_name=None, filter=None, join_type=None, hash_cond=None, merge_cond=None, recheck_cond=None, startup_cos

In [9]:
# 3b) Сбор единого payload для LLM
def _dc_to_dict(obj):
    # быстрая сериализация dataclass объектов (Issue/Suggestion/IndexCandidate)
    try:
        from dataclasses import asdict, is_dataclass
        return asdict(obj) if is_dataclass(obj) else obj
    except Exception:
        return obj.__dict__ if hasattr(obj, "__dict__") else obj

payload = {
    "sql":               SQL,
    "normalized_sql":    normalized_sql,
    "parser_output":     tables_and_cols,           # из SqlAnalyzer
    "metadata":          metadata or {},            # TableInfoExtractor (если включен)
    # "explain_json":      res.raw_explain_json,      # сырой JSON EXPLAIN
    "heuristics": {
        "issues":            [_dc_to_dict(it) for it in res.issues],
        "suggestions":       [_dc_to_dict(s)  for s  in res.suggestions],
        "index_candidates":  [_dc_to_dict(ic) for ic in res.index_candidates],
        # можно добавить «сплющенный» план:
        "plan_meta":         res.plan.model_dump(),  # pydantic → dict
    },
}
payload

{'sql': "with found_bad_ads as (select pa.*\n                       from ads.ads as aa\n                                join parser.ads as pa on pa.item_id = aa.item_id and pa.item_change_at >\n                                                                                     aa.item_change_at +\n                                                                                     INTERVAL '3 hours 10 minutes'\n                                                                                     order by pa.wc_count desc\n                                                                                     )\n\nselect date(item_change_at), count(*)\nfrom found_bad_ads\ngroup by date(item_change_at)\n--order by wc_count desc\n;",
 'normalized_sql': "WITH found_bad_ads AS (\n  SELECT\n    pa.*\n  FROM ads.ads AS aa\n  JOIN parser.ads AS pa\n    ON pa.item_id = aa.item_id\n    AND pa.item_change_at > aa.item_change_at + INTERVAL '3 hours 10 minutes'\n  ORDER BY\n    pa.wc_count DESC\n)\nSE

In [10]:
# 3b) Сбор единого payload для LLM
def _dc_to_dict(obj):
    # быстрая сериализация dataclass объектов (Issue/Suggestion/IndexCandidate)
    try:
        from dataclasses import asdict, is_dataclass
        return asdict(obj) if is_dataclass(obj) else obj
    except Exception:
        return obj.__dict__ if hasattr(obj, "__dict__") else obj

payload = {
    "sql":               SQL,
    "normalized_sql":    normalized_sql,
    "parser_output":     tables_and_cols,           # из SqlAnalyzer
    "metadata":          metadata or {},            # TableInfoExtractor (если включен)
    # "explain_json":      res.raw_explain_json,      # сырой JSON EXPLAIN
    "heuristics": {
        "issues":            [_dc_to_dict(it) for it in res.issues],
        "suggestions":       [_dc_to_dict(s)  for s  in res.suggestions],
        "index_candidates":  [_dc_to_dict(ic) for ic in res.index_candidates],
        # можно добавить «сплющенный» план:
        "plan_meta":         res.plan.model_dump(),  # pydantic → dict
    },  
}
# payload

In [11]:
%%time
# 3c) LLM-сводка поверх всех данных (ОПЦИОНАЛЬНО)
llm_result = None
# Пример клиента (раскомментируйте при желании запустить LLM):
from openai import OpenAI
llm_client = OpenAI(api_key=OPENAI_API_KEY)
from py_pg_explain_analyzer.llm_summarizer import run_llm
llm_result = run_llm(llm_client, payload)

CPU times: total: 969 ms
Wall time: 9.97 s


In [14]:
llm_result, normalized_sql

({'problems': [{'title': 'Полное сканирование таблицы parser.ads',
    'severity': 'medium',
    'details': 'В плане запроса обнаружено последовательное сканирование таблицы parser.ads (alias pa), что может приводить к высокой нагрузке при большом объеме данных (около 4 млн строк).'},
   {'title': 'Сортировка в подзапросе с ORDER BY',
    'severity': 'low',
    'details': 'В CTE found_bad_ads используется ORDER BY pa.wc_count DESC, но отсутствует индекс по wc_count, что приводит к дополнительной сортировке в памяти.'}],
    'details': 'Колонка wc_count в таблице parser.ads не индексирована, что ухудшает производительность сортировки по ней.'}],
  'recommendations': [{'title': 'Добавить составной индекс для ускорения соединения и фильтрации',
    'actions': [{'sql': 'CREATE INDEX ON parser.ads (item_id, item_change_at);',
      'note': 'Индекс поможет ускорить соединение по item_id и фильтрацию по item_change_at > aa.item_change_at + интервал.'}]},
   {'title': 'Добавить индекс по wc_co