In [None]:
import pandas as pd
from pydantic import BaseModel, Field
import time
from typing import Any, Callable, List, Optional, TypeVar, Type
import logging
from openai import OpenAI, RateLimitError
import dotenv
import os

dotenv.load_dotenv()
API_KEY = os.getenv("OR_API_KEY")

## Подготовка вопросов

In [2]:
# def _parse_formatted_question(col: pd.Series):
#     """ Функция, разбивающая отформатированный вопрос на составные части """
#     pattern = r"^\[(?P<tag>[^\]]+)?\]\s*(?P<q_clean>[^@|]+)(?:\s*@\s*(?P<detail>[^|]+))?(?:\s*\|\s*(?P<option>.+))?$"

#     df = col.str.extract(pattern)

#     return df

# def tranc_unique(x: set|list|pd.Series, thr: int = 15):
#     if isinstance(x, pd.Series):
#         unique = sorted(x.dropna().unique())
#     elif isinstance(x, list):
#         unique = sorted(set(x))
#     elif isinstance(x, set):
#         unique = sorted(x)
#     else:
#         raise ValueError
#     if len(unique) > thr:
#         return unique[:thr] + ["..."]
#     return unique

# df = pd.read_parquet(r"D:\WORK\db_nomgnt_latest.parquet", engine="fastparquet")

Фильтр по волне

In [3]:
# wave = "2025-03"

# df = df[df["wave"] == wave]
# df["question"].drop_duplicates().to_excel(f"questions_from_{wave}.xlsx", index=False)

Фильтр по году начала

In [4]:
# YEAR = 2020

# waves = df["wave"].cat.categories.to_series()
# years = waves.str.split("-").str.get(0).astype(int)
# latest_waves = years[years >= YEAR].index

# df = df[df["wave"].isin(latest_waves)]

# QAs = _parse_formatted_question(df["question"].cat.categories.to_series(name="question"))
# ans = df.groupby("question", observed=True).agg(
#     answers=("answer", lambda x: set(x.dropna()))
# )
# QAs = pd.concat([QAs, ans], axis=1, join="inner")

# QAs = QAs.groupby("q_clean", observed=True).agg(
#     details=("detail", lambda x: tranc_unique(x, 30)),
#     options=("option", lambda x: tranc_unique(x, 30)),
#     answers=("answers", lambda x: tranc_unique(set().union(*x)))
# ).reset_index()

# QAs.to_excel(f"questions_from_{YEAR}.xlsx", index=False)

## Ключ, модель и путь к вопросам

In [5]:
qs_path = "questions_from_2025-03.xlsx"

qs = pd.read_excel(qs_path)
client = OpenAI(
  base_url="https://openrouter.ai/api/v1",
  api_key=API_KEY,
)

## Функции

In [None]:
logger = logging.getLogger(__name__)

T = TypeVar("T", bound=BaseModel)


# Повтор с откатом (учитывает rate limit и X-RateLimit-Reset)

def _retry_call(
    fn: Callable[[], Any],
    retries: int = 3,
    base_delay: float = 1.0
) -> Any:
    delay = base_delay
    last_exc: Optional[Exception] = None

    for attempt in range(1, retries + 1):
        try:
            return fn()
        except RateLimitError as e:
            last_exc = e
            wait = delay
            try:
                resp = getattr(e, "response", None)
                ts = resp.headers.get("X-RateLimit-Reset") if resp and hasattr(resp, "headers") else None
                if ts:
                    ts = float(ts) / 1000.0  # мс -> сек
                    wait = max(0.0, ts - time.time())
            except Exception:
                pass

            if attempt < retries:
                logger.warning(f"Rate limit (attempt {attempt}/{retries}); sleep {wait:.1f}s")
                time.sleep(wait)
                delay *= 2
            else:
                raise
        except Exception as e:
            last_exc = e
            if attempt < retries:
                logger.warning(f"{type(e).__name__} (attempt {attempt}/{retries}); sleep {delay:.1f}s")
                time.sleep(delay)
                delay *= 2
            else:
                raise last_exc

# Вывод в формате pydantic схемы

def _get_structured_response(
    client: OpenAI,
    model: str,
    prompt: str,
    response_model: Type[T],
    retries: int = 3,
    base_delay: float = 1.0,
    temperature: float = 0.1,
) -> T:
    def _call():
        resp = client.responses.parse(
            model=model,
            input=prompt,
            text_format=response_model,
            temperature=temperature,
            store=False,
        )
        return resp.output_parsed

    return _retry_call(_call, retries=retries, base_delay=base_delay)

# Pydantic-схемы результата

class ScoredQuestion(BaseModel):
    question: str = Field(..., description="Точная формулировка вопроса из базы")
    reason: str = Field(..., description="Почему этот вопрос полезен для ответа на запрос")
    relevance: float = Field(..., description="Оценка релевантности 0–100")
class RankedQuestions(BaseModel):
    results: List[ScoredQuestion] = Field(
        default_factory=list,
        description="Список релевантных вопросов с объяснениями и оценками"
    )

def extract_questions(
    client: OpenAI,
    model: str,
    user_query: str,
    all_questions: List[str],
    temperature: float = 1.2,
    retries: int = 3,
    base_delay: float = 1.0,
) -> str:
    if not all_questions:
        return "Список вопросов пуст — выбирать нечего."

    questions_block = "\n".join(f"{i+1}. {q}" for i, q in enumerate(all_questions))

    prompt = prompt = f"""
Запрос пользователя: {user_query}

Список доступных вопросов:
{questions_block}

---
Инструкция:

Проанализируй запрос и подбери релевантные вопросы из списка с оценкой релевантности (0-100).

Формат вывода:

**Рассуждение:**
[Краткий анализ запроса: 2-3 предложения о том, что нужно пользователю]

**Рекомендованные вопросы:**

1. **[92/100]** "[Исходная формулировка вопроса]"
   • **Обоснование:** [Почему вопрос напрямую отвечает на запрос]

2. **[78/100]** "[Исходная формулировка вопроса]"
   • **Обоснование:** [Как вопрос связан с ключевой темой]

3. **[65/100]** "[Исходная формулировка вопроса]"
   • **Обоснование:** [Какой контекст или смежную тему раскрывает]

4. **[55/100]** "[Исходная формулировка вопроса]"
   • **Обоснование:** [Чем может быть полезен для понимания]
---
"""

    def _call():
        resp = client.chat.completions.create(
            model=model,
            messages=[
                {"role": "user", "content": prompt},
            ],
            temperature=temperature,
        )
        return resp.choices[0].message.content.strip()

    return _retry_call(_call, retries=retries, base_delay=base_delay)


def unite_questions(
    client: OpenAI,
    model: str,
    extraction_text: str,
    all_questions: List[str],
    max_keep: Optional[int],
    temperature: float = 1,
    retries: int = 3,
    base_delay: float = 1.0,
) -> RankedQuestions:
    """
    Принимает текст из extract_questions и ПОЛНЫЙ список формулировок.
    Возвращает строго структурированный список (Pydantic) с точными формулировками из базы.

    Требуется модель с поддержкой native structured output (Responses API -> parse).
    """
    if not all_questions:
        return RankedQuestions(results=[])

    questions_block = "\n".join(f"{i+1}. {q}" for i, q in enumerate(all_questions))
    cap = "Выбери столько, сколько действительно нужно."
    if isinstance(max_keep, int) and max_keep > 0:
        cap = f"Максимум {max_keep} штук."

    prompt = f"""Твоя задача — сопоставить результаты предыдущего шага с ТЕМИ ЖЕ точными формулировками из базы.

ВХОД:
- Текстовый отчёт предыдущего шага (с рекомендациями и оценками):
\"\"\"{extraction_text}\"\"\"

- Полный список ДОПУСТИМЫХ (ТОЧНЫХ) формулировок вопросов — выбор разрешён ТОЛЬКО из него:
{questions_block}

ТРЕБОВАНИЯ:
- Поле results — список уникальных элементов.
- Для каждого элемента:
  - question — ТОЧНАЯ формулировка из списка выше (обязательно одно из перечисленных).
  - reason — сжато, своими словами, почему он уместен.
  - relevance — число 0–100 (можно скорректировать оценки из отчёта).
- {cap}
- Если соответствия неочевидны, выбери наиболее близкую по смыслу формулировку ИЗ СПИСКА.
- Никаких формулировок вне списка.

Верни только структурированный объект.
"""

    parsed: RankedQuestions = _get_structured_response(
        client=client,
        model=model,
        prompt=prompt,
        response_model=RankedQuestions,
        temperature=temperature,
        retries=retries,
        base_delay=base_delay,
    )

    valid_set = set(all_questions)
    unique = []
    seen = set()
    for item in parsed.results:
        q = (item.question or "").strip()
        if not q or q not in valid_set or q in seen:
            logger.warning(f"Пропущено: '{q[:60]}...' (неточного соответствия или дубликат)")
            continue
        seen.add(q)
        # нормируем диапазон на всякий случай
        score = max(0.0, min(100.0, float(item.relevance)))
        unique.append(ScoredQuestion(question=q, reason=item.reason.strip(), relevance=score))

    return RankedQuestions(results=unique)


## Проверка

In [7]:
questions = qs["question"].to_list()
print(f"Вопросов: {len(questions)}")

Вопросов: 1384


### Уточняем по запросу пользователя какие вопросы нужны

### Вытаскиваем вопросы в свободной форме

In [None]:
user_query = "Посчитай мне размер средних сбережений и норму сбережний среди тех респондентов, у которых они есть."

extract_text = extract_questions(
    client,
    model="tngtech/deepseek-r1t2-chimera:free",
    user_query=user_query,
    all_questions=questions,
)
print(extract_text)

### Рекомендуемые вопросы

1. **[N3] Как бы Вы охарактеризовали величину сбережений Вашей семьи?**  
   Почему: Прямо оценивает размер сбережений, ключевой параметр для расчёта средних значений.  
   Релевантность: 95  

2. **[A26] Каков размер денежных сбережений Вашей семьи (наличные деньги, депозиты в банках, ценные бумаги)?**  
   Почему: Точная количественная оценка сбережений, необходима для расчёта среднего размера.  
   Релевантность: 100  

3. **[N4] Какой % от Вашего дохода Вы сберегаете ежемесячно?**  
   Почему: Прямой показатель нормы сбережений — основная цель анализа.  
   Релевантность: 100  

4. **[Q11V2] К какому из следующих интервалов Вы могли бы отнести доход на одного члена Вашей семьи в месяц в данный момент?**  
   Почему: Доход — база для расчёта нормы сбережений.  
   Релевантность: 90  

5. **[Family_income_new] К какому из следующих интервалов Вы могли бы отнести месячный доход ВАШЕЙ СЕМЬИ?**  
   Почему: Семейный доход влияет на сберегательную способность. 

### Вытаскиваем вопросы с жесткой привязкой к модели Pydantic

In [16]:
ranked = unite_questions(
    client,
    model="meta-llama/llama-4-maverick:free",
    # model="meta-llama/llama-3.3-8b-instruct:free",
    extraction_text=extract_text,
    all_questions=questions,
    max_keep=100
)
print(ranked.model_dump())

Пропущено: '[N47] Есть ли у Вас непогашенный кредит?...' (неточного соответствия или дубликат)
Пропущено: '[Q61] Как Вы оцениваете изменения в Вашем личном материально...' (неточного соответствия или дубликат)
Пропущено: '[Q62] Каковы ожидания в отношении Вашего личного материально...' (неточного соответствия или дубликат)


{'results': [{'question': '[Q2] Сколько Вам лет?', 'reason': 'Возраст — важный сегментирующий фактор для анализа сбережений.', 'relevance': 65.0}, {'question': '[Q11V2] К какому из следующих интервалов Вы могли бы отнести доход на одного члена Вашей семьи в месяц в данный момент?', 'reason': 'Доход — база для расчёта нормы сбережений.', 'relevance': 90.0}, {'question': '[Family_income_new] К какому из следующих интервалов Вы могли бы отнести месячный доход ВАШЕЙ СЕМЬИ (доход членов семьи, проживающих в одной квартире)?', 'reason': 'Семейный доход влияет на сберегательную способность.', 'relevance': 85.0}, {'question': '[N3] Как бы Вы охарактеризовали величину сбережений Вашей семьи?', 'reason': 'Прямо оценивает размер сбережений, ключевой параметр для расчёта средних значений.', 'relevance': 95.0}, {'question': '[A26] Каков размер денежных сбережений Вашей семьи (наличные деньги, депозиты в банках, ценные бумаги)?', 'reason': 'Точная количественная оценка сбережений, необходима для рас