In [23]:
import sys



if 'ipykernel' in sys.modules:

    sys.argv = [sys.argv[0]]

    print('✅ Compatibilidade notebook ativa: pronto para iniciar os prompts.')

✅ Compatibilidade notebook ativa: pronto para iniciar os prompts.


In [26]:
#!/usr/bin/env python3
"""
Anamnese Chat + Controle de Custo (script único)

Agora este script faz tudo em um lugar só:
- roda a anamnese (manual ou com LLM da OpenAI)
- salva resultado e transcript
- calcula e mostra custo estimado/real de tokens
- permite limites de orçamento (requests/tokens)
"""

from __future__ import annotations

import argparse
import json
import os
import re
import sys
from dataclasses import dataclass, asdict
from datetime import datetime, UTC
from typing import Any, Dict, List, Optional, Tuple


# -----------------------------
# Utilitários
# -----------------------------
def now_iso() -> str:
    return datetime.now(UTC).replace(microsecond=0).isoformat().replace("+00:00", "Z")


def prompt_user(question: str, allow_empty: bool = False) -> str:
    while True:
        ans = input(f"{question}\n> ").strip()
        if ans or allow_empty:
            return ans
        print("Por favor, preencha algo (ou digite '-' se não souber).")


def prompt_choice(question: str, choices: List[str]) -> str:
    choices_map = {str(i + 1): c for i, c in enumerate(choices)}
    choices_txt = "\n".join([f"  {i}) {c}" for i, c in choices_map.items()])
    while True:
        ans = input(f"{question}\n{choices_txt}\n> ").strip()
        if ans in choices_map:
            return choices_map[ans]
        if ans in choices:
            return ans
        print("Opção inválida. Selecione um número da lista.")


def parse_int(s: str) -> Optional[int]:
    s = s.strip()
    if s in {"", "-", "nao sei", "não sei"}:
        return None
    s = re.sub(r"[^\d]", "", s)
    if not s:
        return None
    try:
        return int(s)
    except ValueError:
        return None


def parse_float(s: str) -> Optional[float]:
    s = s.strip().lower()
    if s in {"", "-", "nao sei", "não sei"}:
        return None
    s = s.replace(",", ".")
    s = re.sub(r"[^0-9.]", "", s)
    if not s:
        return None
    try:
        return float(s)
    except ValueError:
        return None


def yes_no(question: str) -> bool:
    while True:
        ans = input(f"{question} (s/n)\n> ").strip().lower()
        if ans in {"s", "sim", "y", "yes"}:
            return True
        if ans in {"n", "nao", "não", "no"}:
            return False
        print("Responda com 's' ou 'n'.")


# -----------------------------
# Controle de custo
# -----------------------------
@dataclass
class PriceConfig:
    input_price_per_1m: float
    output_price_per_1m: float


@dataclass
class Usage:
    requests: int = 0
    input_tokens: int = 0
    output_tokens: int = 0


@dataclass
class BudgetLimits:
    max_requests: Optional[int] = None
    max_input_tokens: Optional[int] = None
    max_output_tokens: Optional[int] = None


class CostController:
    def __init__(self, pricing: PriceConfig, limits: Optional[BudgetLimits] = None) -> None:
        self.pricing = pricing
        self.limits = limits or BudgetLimits()
        self.usage = Usage()

    def can_make_request(self, estimated_input_tokens: int = 0, estimated_output_tokens: int = 0) -> tuple[bool, str]:
        next_requests = self.usage.requests + 1
        next_input = self.usage.input_tokens + max(0, estimated_input_tokens)
        next_output = self.usage.output_tokens + max(0, estimated_output_tokens)

        if self.limits.max_requests is not None and next_requests > self.limits.max_requests:
            return False, f"Limite de requests atingido ({self.limits.max_requests})."
        if self.limits.max_input_tokens is not None and next_input > self.limits.max_input_tokens:
            return False, f"Limite de input tokens atingido ({self.limits.max_input_tokens})."
        if self.limits.max_output_tokens is not None and next_output > self.limits.max_output_tokens:
            return False, f"Limite de output tokens atingido ({self.limits.max_output_tokens})."

        return True, "ok"

    def register_request(self, input_tokens: int, output_tokens: int) -> None:
        self.usage.requests += 1
        self.usage.input_tokens += max(0, input_tokens)
        self.usage.output_tokens += max(0, output_tokens)

    def input_cost(self) -> float:
        return (self.usage.input_tokens / 1_000_000) * self.pricing.input_price_per_1m

    def output_cost(self) -> float:
        return (self.usage.output_tokens / 1_000_000) * self.pricing.output_price_per_1m

    def total_cost(self) -> float:
        return self.input_cost() + self.output_cost()

    def report_dict(self) -> Dict[str, Any]:
        return {
            "requests": self.usage.requests,
            "input_tokens": self.usage.input_tokens,
            "output_tokens": self.usage.output_tokens,
            "input_cost_usd": round(self.input_cost(), 8),
            "output_cost_usd": round(self.output_cost(), 8),
            "total_cost_usd": round(self.total_cost(), 8),
        }


# -----------------------------
# Estrutura de dados (JSON)
# -----------------------------
@dataclass
class Anamnese:
    meta: Dict[str, Any]
    perfil: Dict[str, Any]
    objetivos: Dict[str, Any]
    rotina: Dict[str, Any]
    alimentacao: Dict[str, Any]
    saude: Dict[str, Any]
    treino: Dict[str, Any]
    sono_estresse: Dict[str, Any]
    suplementos: Dict[str, Any]
    observacoes: Dict[str, Any]


def empty_anamnese() -> Anamnese:
    return Anamnese(
        meta={"created_at": now_iso(), "version": "1.1"},
        perfil={},
        objetivos={},
        rotina={},
        alimentacao={},
        saude={},
        treino={},
        sono_estresse={},
        suplementos={},
        observacoes={},
    )


# -----------------------------
# OpenAI rewrite + custo por request
# -----------------------------
def openai_rewrite_question(
    original_q: str,
    context: Dict[str, Any],
    llm_mode: str,
    cost: Optional[CostController],
    avg_input_tokens_fallback: int,
    avg_output_tokens_fallback: int,
) -> str:
    if llm_mode != "openai":
        return original_q

    api_key = os.getenv("OPENAI_API_KEY")
    if not api_key:
        print("⚠️ OPENAI_API_KEY não definido. Seguindo sem LLM.")
        return original_q

    try:
        from openai import OpenAI
    except Exception:
        print("⚠️ Pacote openai não disponível. Instale com: pip install openai")
        return original_q

    if cost is not None:
        ok, reason = cost.can_make_request(
            estimated_input_tokens=avg_input_tokens_fallback,
            estimated_output_tokens=avg_output_tokens_fallback,
        )
        if not ok:
            print(f"⚠️ LLM bloqueado por orçamento: {reason}")
            return original_q

    client = OpenAI(api_key=api_key)
    prompt = (
        "Reescreva a pergunta abaixo em português brasileiro, curta e amigável, "
        "sem adicionar novos temas. Responda só com a pergunta final.\n\n"
        f"Pergunta original: {original_q}\n"
    )

    try:
        resp = client.responses.create(
            model="gpt-4.1-nano",
            input=prompt,
            temperature=0.2,
        )

        input_tokens = avg_input_tokens_fallback
        output_tokens = avg_output_tokens_fallback

        usage = getattr(resp, "usage", None)
        if usage is not None:
            input_tokens = int(getattr(usage, "input_tokens", input_tokens) or input_tokens)
            output_tokens = int(getattr(usage, "output_tokens", output_tokens) or output_tokens)

        if cost is not None:
            cost.register_request(input_tokens=input_tokens, output_tokens=output_tokens)

        text = (getattr(resp, "output_text", "") or "").strip()
        return text if text else original_q
    except Exception as e:
        print(f"⚠️ Falha na chamada OpenAI: {e}")
        return original_q


# -----------------------------
# Fluxo da anamnese (manual ou llm)
# -----------------------------
def run_anamnese_flow(
    a: Anamnese,
    llm_mode: str,
    cost: Optional[CostController],
    avg_input_tokens_fallback: int,
    avg_output_tokens_fallback: int,
) -> Tuple[Anamnese, List[Dict[str, str]]]:
    transcript: List[Dict[str, str]] = []
    ctx: Dict[str, Any] = {}

    def ask(q: str, allow_empty: bool = False) -> str:
        q2 = openai_rewrite_question(
            original_q=q,
            context=ctx,
            llm_mode=llm_mode,
            cost=cost,
            avg_input_tokens_fallback=avg_input_tokens_fallback,
            avg_output_tokens_fallback=avg_output_tokens_fallback,
        )
        transcript.append({"role": "assistant", "text": q2})
        ans = prompt_user(q2, allow_empty=allow_empty)
        transcript.append({"role": "user", "text": ans})
        return ans

    mode_txt = "MODO LLM (OpenAI)" if llm_mode == "openai" else "MODO MANUAL"
    print(f"\n=== ANAMNESE ({mode_txt}) ===\nResponda com sinceridade. Se não souber algo, digite '-'.")

    nome = ask("Qual seu nome?")
    idade = parse_int(ask("Qual sua idade (anos)?"))
    sexo = prompt_choice("Sexo biológico:", ["Masculino", "Feminino", "Prefiro não informar"])
    altura = parse_float(ask("Altura (em cm)? Ex: 175"))
    peso = parse_float(ask("Peso (em kg)? Ex: 82.5"))
    a.perfil.update({"nome": nome, "idade": idade, "sexo": sexo, "altura_cm": altura, "peso_kg": peso})
    ctx["perfil"] = a.perfil

    objetivo = ask("Qual seu principal objetivo agora? (ex: emagrecer, ganhar massa, performance, saúde)")
    prazo = ask("Você tem um prazo/meta de data? (ex: 3 meses, até julho) (pode '-' )", allow_empty=True)
    motivacao = ask("Qual o motivo mais forte por trás desse objetivo? (pode '-' )", allow_empty=True)
    a.objetivos.update({
        "objetivo_principal": objetivo,
        "prazo": None if prazo.strip() in {"", "-"} else prazo,
        "motivacao": None if motivacao.strip() in {"", "-"} else motivacao,
    })
    ctx["objetivos"] = a.objetivos

    ocupacao = ask("Como é sua rotina de trabalho/estudo? (ex: escritório, home office, turnos, etc.)")
    passos = parse_int(ask("Em média, quantos passos por dia? (se não souber, '-')"))
    agua = parse_float(ask("Quanto de água por dia (litros)? Ex: 2.0 (ou '-')"))
    a.rotina.update({"ocupacao_rotina": ocupacao, "passos_dia": passos, "agua_litros_dia": agua})

    refeicoes = parse_int(ask("Quantas refeições por dia você costuma fazer? (ou '-')"))
    restricoes = ask("Você tem restrições alimentares? (alergias, religião, preferência, etc.) (pode '-' )", allow_empty=True)
    ultraprocessados = prompt_choice("Com que frequência consome ultraprocessados?", [
        "Raramente", "1-2x/semana", "3-5x/semana", "Quase todo dia"
    ])
    alcool = prompt_choice("Consumo de álcool:", ["Não consumo", "1-2x/mês", "1-2x/semana", "3+ vezes/semana"])
    a.alimentacao.update({
        "refeicoes_dia": refeicoes,
        "restricoes": None if restricoes.strip() in {"", "-"} else restricoes,
        "ultraprocessados": ultraprocessados,
        "alcool": alcool,
    })

    condicoes = ask("Possui alguma condição de saúde? (ex: hipertensão, diabetes, gastrite) (pode '-' )", allow_empty=True)
    medicamentos = ask("Usa medicamentos? Quais? (pode '-' )", allow_empty=True)
    exames = ask("Fez exames recentes? Quais? (pode '-' )", allow_empty=True)
    a.saude.update({
        "condicoes": None if condicoes.strip() in {"", "-"} else condicoes,
        "medicamentos": None if medicamentos.strip() in {"", "-"} else medicamentos,
        "exames_recentes": None if exames.strip() in {"", "-"} else exames,
    })

    treina = yes_no("Você treina atualmente?")
    tipo, freq = None, None
    if treina:
        tipo = ask("Que tipo de treino? (musculação, corrida, cross, etc.)")
        freq = parse_int(ask("Quantos dias por semana?"))
    a.treino.update({"treina_atualmente": treina, "tipo_treino": tipo, "frequencia_semana": freq})

    sono_h = parse_float(ask("Em média, quantas horas você dorme por noite? (ex: 7.5 ou '-')"))
    qualidade_sono = prompt_choice("Qualidade do sono:", ["Boa", "Regular", "Ruim"])
    estresse = prompt_choice("Nível de estresse:", ["Baixo", "Médio", "Alto"])
    a.sono_estresse.update({"sono_horas_noite": sono_h, "qualidade_sono": qualidade_sono, "nivel_estresse": estresse})

    usa_suplementos = yes_no("Você usa suplementos atualmente?")
    lista = ask("Quais suplementos e doses? (ex: creatina 3g/dia)") if usa_suplementos else None
    a.suplementos.update({"usa_suplementos": usa_suplementos, "lista": lista})

    obs = ask("Mais alguma observação importante? (dores, limitações, preferências, etc.) (pode '-' )", allow_empty=True)
    a.observacoes.update({"observacoes_livres": None if obs.strip() in {"", "-"} else obs})

    return a, transcript


# -----------------------------
# CLI
# -----------------------------
def main(argv: Optional[List[str]] = None) -> int:
    parser = argparse.ArgumentParser(description="Anamnese + custo OpenAI em script único.")
    parser.add_argument("--out", default="anamnese_resultado.json", help="Arquivo JSON de saída")
    parser.add_argument("--transcript", default="anamnese_transcript.json", help="Arquivo JSON do transcript")
    parser.add_argument("--llm", choices=["none", "openai"], default="none", help="Usar IA para reescrever perguntas")

    parser.add_argument("--input-price-per-1m", type=float, default=0.15, help="Preço input por 1M tokens (USD)")
    parser.add_argument("--output-price-per-1m", type=float, default=0.60, help="Preço output por 1M tokens (USD)")
    parser.add_argument("--max-requests", type=int, default=None, help="Limite de requests LLM")
    parser.add_argument("--max-input-tokens", type=int, default=None, help="Limite de input tokens")
    parser.add_argument("--max-output-tokens", type=int, default=None, help="Limite de output tokens")
    parser.add_argument("--avg-input-tokens", type=int, default=260, help="Fallback de input tokens por request")
    parser.add_argument("--avg-output-tokens", type=int, default=30, help="Fallback de output tokens por request")

    args, unknown = parser.parse_known_args(argv)
    if unknown:
        print(f"ℹ️ Ignorando argumentos extras: {unknown}")

    pricing = PriceConfig(
        input_price_per_1m=args.input_price_per_1m,
        output_price_per_1m=args.output_price_per_1m,
    )
    limits = BudgetLimits(
        max_requests=args.max_requests,
        max_input_tokens=args.max_input_tokens,
        max_output_tokens=args.max_output_tokens,
    )
    cost = CostController(pricing=pricing, limits=limits)

    a = empty_anamnese()
    a, transcript = run_anamnese_flow(
        a=a,
        llm_mode=args.llm,
        cost=cost,
        avg_input_tokens_fallback=args.avg_input_tokens,
        avg_output_tokens_fallback=args.avg_output_tokens,
    )

    payload = asdict(a)
    payload["custo_openai"] = cost.report_dict()

    with open(args.out, "w", encoding="utf-8") as f:
        json.dump(payload, f, ensure_ascii=False, indent=2)

    with open(args.transcript, "w", encoding="utf-8") as f:
        json.dump({"created_at": payload["meta"]["created_at"], "messages": transcript}, f, ensure_ascii=False, indent=2)

    print("\n✅ Concluído!")
    print(f"- Resultado estruturado: {args.out}")
    print(f"- Transcript da conversa: {args.transcript}")

    print("\n--- RESUMO (rápido) ---")
    print(f"Nome: {payload['perfil'].get('nome')}")
    print(f"Objetivo: {payload['objetivos'].get('objetivo_principal')}")
    print(f"Treina atualmente: {payload['treino'].get('treina_atualmente')}")

    print("\n--- CUSTO OPENAI ---")
    print(json.dumps(payload["custo_openai"], ensure_ascii=False, indent=2))

    return 0


if "ipykernel" in sys.modules:
    main([])
elif __name__ == "__main__":
    raise SystemExit(main())


=== ANAMNESE (MODO MANUAL) ===
Responda com sinceridade. Se não souber algo, digite '-'.
Opção inválida. Selecione um número da lista.

✅ Concluído!
- Resultado estruturado: anamnese_resultado.json
- Transcript da conversa: anamnese_transcript.json

--- RESUMO (rápido) ---
Nome: agnei
Objetivo: emagrecer e definir, ganho de massa
Treina atualmente: True

--- CUSTO OPENAI ---
{
  "requests": 0,
  "input_tokens": 0,
  "output_tokens": 0,
  "input_cost_usd": 0.0,
  "output_cost_usd": 0.0,
  "total_cost_usd": 0.0
}
