
# Relatório Individual — v8
- Une a estrutura da v6/v7.2 com o gerador de texto da v7.1.
- Se os helpers da v7.1 não forem detectados, usa o texto heurístico completo.


In [None]:

# === Config ===
SPREADSHEET_INPUT_ID  = "COLOQUE_AQUI"   # planilha do site (Leads_Clean)
SPREADSHEET_OUTPUT_ID = "COLOQUE_AQUI"   # planilha destino das abas REL_*

# Sessão alvo (defina um). Se ambos None, usa a última selection_final.
SESSION_EFFECTIVE = None                 # ex.: "1ba80cbd-..."
EVENT_ID         = None                  # ex.: "evt_..."

# Catálogo com atributos
CAT_PATHS = [
    "data/essencias_88_enriquecido.json",
    "/content/data/essencias_88_enriquecido.json",
]

# Texto
TEXT_MODE    = "heuristic"  # "heuristic" | "llm" | "llm_editor" | "v7_1"
LLM_PROVIDER = "openai"
LLM_MODEL    = "gpt-4o-mini"
LLM_API_KEY  = None
LLM_API_URL  = None

# Gravação mínima
TEXT_ONLY = False           # True => só REL_TEXT e REL_DETALHES_TODAS

# Abas
TAB_OVERVIEW    = "REL_OVERVIEW"
TAB_PRE_POS     = "REL_PRE_POS"
TAB_PRE_NEG     = "REL_PRE_NEG"
TAB_FINAL_POS   = "REL_FINAL_POS"
TAB_FINAL_NEG   = "REL_FINAL_NEG"
TAB_MOVEMENT    = "REL_MOVEMENT"
TAB_COMPARATIVO = "REL_COMPARATIVO"
TAB_RESUMO      = "REL_RESUMO"
TAB_TEMPORAL    = "REL_TEMPORAL"
TAB_PRODUTO     = "REL_PRODUTO"
TAB_TEXT        = "REL_TEXT"

TAB_DET_PRE_POS = "REL_DETALHES_PRE_POS"
TAB_DET_PRE_NEG = "REL_DETALHES_PRE_NEG"
TAB_DET_FIN_POS = "REL_DETALHES_FINAL_POS"
TAB_DET_FIN_NEG = "REL_DETALHES_FINAL_NEG"
TAB_DET_ALL     = "REL_DETALHES_TODAS"
TAB_ATTRS       = "REL_ATTRS"
TAB_ATTRS_STAGE = "REL_ATTRS_STAGE"


In [None]:
%pip -q install gspread gspread_dataframe pandas requests

In [None]:

# === Autenticação e abertura ===
import gspread, json, pandas as pd, numpy as np, unicodedata, re, requests, os
from gspread_dataframe import get_as_dataframe, set_with_dataframe

SCOPES = ["https://www.googleapis.com/auth/spreadsheets",
          "https://www.googleapis.com/auth/drive"]

try:
    from google.colab import auth as colab_auth
    colab_auth.authenticate_user()
    import google.auth
    creds, _ = google.auth.default(scopes=SCOPES)
    gc = gspread.authorize(creds)
except Exception as e:
    raise SystemExit(f"Falha de autenticação: {e}")

assert SPREADSHEET_INPUT_ID  != "COLOQUE_AQUI",  "Defina SPREADSHEET_INPUT_ID."
assert SPREADSHEET_OUTPUT_ID != "COLOQUE_AQUI", "Defina SPREADSHEET_OUTPUT_ID."

ss_in  = gc.open_by_key(SPREADSHEET_INPUT_ID)
ss_out = gc.open_by_key(SPREADSHEET_OUTPUT_ID)
print("OK: planilhas abertas.")


In [None]:

# === Utils ===
from pathlib import Path
from collections import Counter, defaultdict

def _norm(s:str)->str:
    s = str(s or "").strip().lower()
    s = "".join(c for c in unicodedata.normalize("NFKD", s) if not unicodedata.combining(c))
    s = re.sub(r"[^a-z0-9\s\-\_]", "", s)
    s = re.sub(r"\s+", " ", s).strip()
    return s

def build_alias_map_from_json(path: Path):
    data = json.loads(path.read_text(encoding="utf-8"))
    items = data if isinstance(data, list) else data.get("items", [])
    alias = {}
    for it in items:
        eid = it.get("id") or _norm(it.get("name"))
        names = [it.get("name","")] + it.get("aliases", [])
        for n in names:
            alias[_norm(n)] = eid
    return alias

def load_catalog_df():
    for p in CAT_PATHS:
        pth = Path(p)
        if pth.exists():
            data = json.loads(pth.read_text(encoding="utf-8"))
            cat = pd.DataFrame(data if isinstance(data, list) else data.get("items", []))
            if "id" not in cat.columns and "essence_id" in cat.columns: cat["id"] = cat["essence_id"]
            for src,dst in {"cor":"color","chakra":"chakra","camada":"camada","arquetipo":"arquetipo","dominio":"dominio"}.items():
                if dst not in cat.columns and src in cat.columns:
                    cat[dst] = cat[src]
            cat["id_norm"] = cat["id"].astype(str).str.lower()
            keep = [c for c in ["id_norm","color","chakra","camada","arquetipo","dominio"] if c in cat.columns]
            return cat[keep]
    return None

def parse_selection_both(sel):
    parts = [p.strip() for p in str(sel or "").split("|") if p.strip()]
    pos = [p[3:].strip() for p in parts if p.startswith("(+)")]
    neg = [p[3:].strip() for p in parts if p.startswith("(-)")]
    return pos, neg

def write_tab(ss, name, df):
    try:
        ws = ss.worksheet(name); ws.clear()
    except Exception:
        rows = max(100, (len(df)+10) if df is not None else 100)
        ws = ss.add_worksheet(title=name, rows=str(rows), cols="50")
    if df is None or len(df)==0: df = pd.DataFrame([{"info":"sem dados"}])
    set_with_dataframe(ws, df.reset_index(drop=True), include_index=False)


In [None]:

# === Leitura (stage ← kind) ===
def read_leads(spreadsheet, tab="Leads_Clean"):
    ws = spreadsheet.worksheet(tab)
    df = get_as_dataframe(ws, evaluate_formulas=True, header=0).dropna(how="all")
    if "stage" not in df.columns:
        if "kind" in df.columns:
            k = df["kind"].astype(str).str.lower()
            df["stage"] = k.where(k.isin(["preselection","selection_final"]), "")
        else:
            df["stage"] = ""
    for c in ["stage","mode","tenant_id","team"]:
        if c in df.columns: df[c] = df[c].astype(str).str.lower()
    df = df[df["stage"].isin(["preselection","selection_final"])].copy()
    if "timestamp_local" in df.columns:
        df["timestamp_local"] = pd.to_datetime(df["timestamp_local"], errors="coerce")
    if "selection_count" in df.columns:
        df["selection_count"] = pd.to_numeric(df["selection_count"], errors="coerce").fillna(0).astype(int)
    return df

raw = read_leads(ss_in)
print("Linhas válidas:", len(raw))


In [None]:

# === Resolver sessão ===
def resolve_session(df, SESSION_EFFECTIVE=None, EVENT_ID=None):
    if SESSION_EFFECTIVE:
        s = str(SESSION_EFFECTIVE).strip()
        if (df["session_effective"]==s).any():
            return s
        if "event_id_linked" in df.columns and (df["event_id_linked"]==s).any():
            return df.loc[df["event_id_linked"]==s, "session_effective"].iloc[0]
    if EVENT_ID and "event_id_linked" in df.columns:
        e = str(EVENT_ID).strip()
        if (df["event_id_linked"]==e).any():
            return df.loc[df["event_id_linked"]==e, "session_effective"].iloc[0]
    cand = df[df["stage"]=="selection_final"].sort_values("timestamp_local").tail(1)
    if len(cand):
        return cand["session_effective"].iloc[0]
    raise ValueError("Nenhuma sessão encontrada. Defina SESSION_EFFECTIVE ou EVENT_ID.")

SESS = resolve_session(raw, SESSION_EFFECTIVE, EVENT_ID)
print("Sessão alvo:", SESS)


In [None]:

# === Extrair, normalizar e enriquecer ===
from pathlib import Path
alias_map = {}
for p in CAT_PATHS:
    pth = Path(p)
    if pth.exists():
        alias_map = build_alias_map_from_json(pth); break
CAT = load_catalog_df()

d = raw[raw["session_effective"]==SESS].copy()
pre  = d[d["stage"]=="preselection"].sort_values("timestamp_local").tail(1)
fin  = d[d["stage"]=="selection_final"].sort_values("timestamp_local").tail(1)

def expand_rows(row, stage):
    pos, neg = parse_selection_both(row.get("selection"))
    recs = []
    for name in pos: recs.append({"stage": stage, "valence":"positive", "essence_name": name})
    for name in neg: recs.append({"stage": stage, "valence":"negative", "essence_name": name})
    return pd.DataFrame.from_records(recs) if recs else pd.DataFrame(columns=["stage","valence","essence_name"])

pre_rows = expand_rows(pre.iloc[0] if len(pre) else {}, "preselection")
fin_rows = expand_rows(fin.iloc[0] if len(fin) else {}, "selection_final")

def normalize_names(df_names):
    if df_names is None or df_names.empty: return df_names
    if alias_map:
        df_names = df_names.assign(essence_id=df_names["essence_name"].map(lambda x: alias_map.get(_norm(x), _norm(x))))
    else:
        df_names = df_names.assign(essence_id=df_names["essence_name"].map(_norm))
    return df_names

pre_rows  = normalize_names(pre_rows)
fin_rows  = normalize_names(fin_rows)

pre_pos   = pre_rows[pre_rows["valence"]=="positive"][["essence_id","essence_name"]]
pre_neg   = pre_rows[pre_rows["valence"]=="negative"][["essence_id","essence_name"]]
final_pos = fin_rows[fin_rows["valence"]=="positive"][["essence_id","essence_name"]]
final_neg = fin_rows[fin_rows["valence"]=="negative"][["essence_id","essence_name"]]

preP, preN = set(pre_pos["essence_id"]), set(pre_neg["essence_id"])
finP, finN = set(final_pos["essence_id"]), set(final_neg["essence_id"])
rows = []
for e in sorted(preP | preN | finP | finN):
    rows.append({
        "essence_id": e,
        "pre":  "+" if e in preP else ("-" if e in preN else ""),
        "final":"+"" if e in finP else ("-" if e in finN else ""),
        "pp": int(e in preP and e in finP),
        "pn": int(e in preP and e not in finP),
        "np": int(e in preN and e in finP),
        "nn": int(e in preN and e in finN),
    })
comparativo = pd.DataFrame(rows)
mov_cols = ["pp","pn","np","nn"]
comparativo["movement"] = comparativo.apply(lambda r: "++" if r["pp"] else ("+-" if r["pn"] else ("-+" if r["np"] else ("--" if r["nn"] else (r["pre"]+'0' if r["pre"] else ('0'+r["final"] if r["final"] else '0'))))), axis=1)
movement = comparativo[["essence_id"] + mov_cols + ["movement"]].sort_values(mov_cols, ascending=[False, True, True, True])

meta_cols = ["timestamp_local","tenant_id","team","mode","selection_count","name","email","phone","origin","utm_source","utm_medium","utm_campaign"]
overview = d.sort_values("timestamp_local").tail(1)
overview = overview[[c for c in meta_cols if c in overview.columns]].copy()


In [None]:

# === Detalhes por flor com atributos ===
def enrich_with_attrs(df_tab):
    if df_tab is None or df_tab.empty: return df_tab
    out = df_tab.copy()
    out["id_norm"] = out["essence_id"].astype(str).str.lower()
    if CAT is not None: out = out.merge(CAT, on="id_norm", how="left")
    cols_pref = ["stage","valence","essence_id","essence_name","color","chakra","camada","arquetipo","dominio"]
    cols = [c for c in cols_pref if c in out.columns] + [c for c in out.columns if c not in cols_pref+["id_norm"]]
    return out[cols]

pre_pos_e = enrich_with_attrs(pre_rows[pre_rows["valence"]=="positive"])
pre_neg_e = enrich_with_attrs(pre_rows[pre_rows["valence"]=="negative"])
fin_pos_e = enrich_with_attrs(fin_rows[fin_rows["valence"]=="positive"])
fin_neg_e = enrich_with_attrs(fin_rows[fin_rows["valence"]=="negative"])

det_all = pd.concat([pre_pos_e, pre_neg_e, fin_pos_e, fin_neg_e], ignore_index=True)
stage_order = {"preselection":0, "selection_final":1}
val_order = {"positive":0, "negative":1}
if len(det_all):
    det_all = det_all.assign(_s=det_all["stage"].map(stage_order), _v=det_all["valence"].map(val_order)) \
                     .sort_values(["_s","_v","essence_name"]).drop(columns=["_s","_v"])


In [None]:

# === Contagens de atributos ===
def count_attrs(df, kind_label):
    rows = []
    if df is None or len(df)==0: return pd.DataFrame(columns=["kind","attr","value","count"])
    for attr in ["color","chakra","camada","arquetipo","dominio"]:
        if attr in df.columns:
            cnt = Counter(df[attr].dropna().astype(str).tolist())
            for k,v in cnt.items(): rows.append({"kind": kind_label, "attr": attr, "value": k, "count": v})
    return pd.DataFrame(rows)

REL_ATTRS_STAGE = pd.concat([
    count_attrs(pre_pos_e,   "pre_pos"),
    count_attrs(pre_neg_e,   "pre_neg"),
    count_attrs(fin_pos_e,   "final_pos"),
    count_attrs(fin_neg_e,   "final_neg"),
], ignore_index=True)

def top3_overall(df_counts):
    if df_counts is None or len(df_counts)==0: return pd.DataFrame([{"attr":"info","value":"sem dados"}])
    rows = []
    for attr in ["color","chakra","camada","arquetipo","dominio"]:
        sub = df_counts[df_counts["attr"]==attr].groupby("value")["count"].sum().sort_values(ascending=False).head(3)
        if len(sub): rows.append({"attr": f"{attr}_top3", "value": ", ".join([f"{k}({v})" for k,v in sub.items()])})
    return pd.DataFrame(rows) if rows else pd.DataFrame([{"attr":"info","value":"sem dados"}])

REL_ATTRS = top3_overall(REL_ATTRS_STAGE)


### Helpers da v7.1

In [None]:
import os, json, re, unicodedata
from typing import List, Dict, Any, Tuple

CATALOG_PATHS = [
    "/mnt/data/essencias_88_enriquecido.json",
    "/mnt/data/essencias_88.json",
]

def _slugify(name: str) -> str:
    s = unicodedata.normalize("NFKD", name)
    s = "".join(c for c in s if not unicodedata.combining(c))
    s = re.sub(r"[^a-zA-Z0-9]+", "_", s).strip("_").lower()
    return s

def load_catalog() -> Dict[str, Dict[str, Any]]:
    for p in CATALOG_PATHS:
        if os.path.exists(p):
            with open(p, "r", encoding="utf-8") as f:
                items = json.load(f)
            return {it["id"]: it for it in items}
    return {}

CATALOG = load_catalog()

def map_name_to_id(name: str) -> str:
    slug = _slugify(name)
    if slug in CATALOG:
        return slug
    for k, m in CATALOG.items():
        if m.get("nome","").strip().lower() == name.strip().lower():
            return k
    for k, m in CATALOG.items():
        if _slugify(m.get("nome","")) == slug:
            return k
    return slug

def safe_meta(eid: str) -> Dict[str, Any]:
    m = CATALOG.get(eid)
    if m: return m
    return {"id": eid, "nome": eid, "camada": [], "familia": "", "cor": [], "chakras": [], "arquetipos": [], "funcoes": []}

def parse_selection(selection_str: str) -> List[Tuple[str,int]]:
    if not isinstance(selection_str, str) or not selection_str.strip():
        return []
    parts = [p.strip() for p in selection_str.split("|")]
    out = []
    for p in parts:
        m = re.match(r"^\((\+|\-)\)\s*(.+?)\s*$", p)
        if not m:
            sign = +1; name = p
        else:
            sign = +1 if m.group(1) == "+" else -1
            name = m.group(2)
        eid = map_name_to_id(name)
        out.append((eid, sign))
    return out



REL_FAMILIAS = set(["Kangaroo Paw"])
TRIGGER_FAMILIAS = set(["Triggerplant"])
PROF_IDS = set(["christmas_tree","balga_blackboy","goddess_grasstree","macrozamia","ursinia"])
C5_IDS = set(["wa_smokebush","leafless_orchid","red_beak_orchid","pink_trumpet_flower","purple_enamel_orchid"])

def _sum_list(vals):
    acc = {}
    for v in vals: acc[v] = acc.get(v,0)+1
    return acc

def build_metrics(ids):
    metas = [safe_meta(x) for x in ids]
    camadas = [c for m in metas for c in (m.get("camada") or [])]
    cores   = [c for m in metas for c in (m.get("cor") or [])]
    chak    = [c for m in metas for c in (m.get("chakras") or [])]
    Ccount = _sum_list(camadas)
    coreCount = _sum_list(cores)
    chCount = _sum_list(chak)
    IR = sum(1 for m in metas if "H" in (m.get("camada") or []) or (m.get("familia")=="Kangaroo Paw"))
    IP = sum(1 for m in metas if "P" in (m.get("camada") or []) or (m.get("id") in PROF_IDS))
    IE = sum(1 for m in metas if (m.get("familia") in TRIGGER_FAMILIAS) or (m.get("id") in C5_IDS) or ("C5" in (m.get("camada") or [])))
    return {
        "I_C1": Ccount.get("C1",0),"I_C2": Ccount.get("C2",0),"I_C3": Ccount.get("C3",0),"I_C4": Ccount.get("C4",0),"I_C5": Ccount.get("C5",0),
        "IR": IR, "IP": IP, "IE": IE, "cores": coreCount, "chakras": chCount, "presentes": ids
    }

def _pick_dom(d):
    if not d: return ""
    return max(d.items(), key=lambda kv: kv[1])[0]



def sugerir_suportes(ids):
    metas = [safe_meta(x) for x in ids]
    def has(pred): return any(pred(m) for m in metas)
    out = []
    if has(lambda m: "liberar_emocoes" in (m.get("funcoes") or []) or "expressao" in (m.get("funcoes") or [])): out.append("Dampiera")
    if has(lambda m: "Ch2" in (m.get("chakras") or []) or "sexual" in (m.get("funcoes") or [])): out.append("Macrozamia")
    if has(lambda m: "memorias_dolorosas" in (m.get("funcoes") or [])): out.append("Illyarrie")
    if has(lambda m: "solucoes_criativas" in (m.get("funcoes") or [])): out.append("Star of Bethlehem")
    if has(lambda m: "renascimento" in (m.get("funcoes") or []) and "trauma" in (m.get("funcoes") or [])): out.append("Menzies Banksia")
    if has(lambda m: "pavor_morte" in (m.get("funcoes") or [])): out.append("Ribbon Pea")
    seen=set(); res=[]
    for x in out:
        if x not in seen:
            res.append(x); seen.add(x)
        if len(res)>=2: break
    return res

def _frase_direcao(m_pos, m_neg):
    if m_pos["I_C1"]>0: return "Há direção de propósito ativa nas escolhas."
    if m_neg["I_C1"]>0: return "Propósito aparece com resistência ou baixa consciência."
    return "A direção de propósito não aparece entre as principais escolhas."

def _frase_gargalo(m_pos, m_neg):
    if m_pos["I_C2"]>=2 and m_pos["I_C3"]>=2: return "Feridas e defesas operam juntas e explicam atrito no cotidiano."
    if m_pos["I_C2"]>=2: return "Feridas ativas pedem espaço de acolhimento antes de ajustes de padrão."
    if m_pos["I_C3"]>=3: return "Padrões de rigidez e controle dominam a adaptação."
    if m_pos["I_C5"]>=1: return "Há sinais de cansaço energético que exigem cadência."
    return ""

def _frase_relacional(ids_pos):
    kpos = sum(1 for eid in ids_pos if safe_meta(eid).get("familia","")=="Kangaroo Paw")
    if kpos>=2: return "Relações estão no centro: pedem refinamento de presença e escuta."
    if kpos==1: return "Relações pedem pequenos ajustes de ritmo e comunicação."
    return ""

def _frase_energia(m_pos, m_neg):
    if m_pos["IE"]>=2: return "Primeiro passo é estabilizar o ritmo para recuperar energia."
    if m_pos["I_C5"]>=1: return "Sinais de oscilação energética aparecem e merecem atenção."
    if m_neg["I_C5"]>=1: return "Reconhecer limites energéticos reduz atrito oculto."
    return ""

def _frase_cor(m_pos):
    dom = _pick_dom(m_pos["cores"]);
    if not dom: return ""
    zeros = [c for c in ["Amarelo","Azul","Vermelho","Verde","Rosa","Violeta","Laranja","Branco","Preto"] if m_pos["cores"].get(c,0)==0]
    if len(zeros)>=3: return f"Predomínio cromático em {dom}. Falta contraste funcional em outras qualidades."
    return f"Predomínio cromático em {dom}."

def _frase_chakra(m_pos):
    dom = _pick_dom(m_pos["chakras"])
    return f"Hotspot energético em {dom}." if dom else ""

def _frase_resistencia(m_neg, ids_neg):
    msgs = []
    if m_neg["I_C1"]>=1: msgs.append("Resistência em declarar ou sustentar propósito (C1).")
    if m_neg["I_C2"]>=2: msgs.append("Resistência em tocar feridas/choques (C2).")
    if m_neg["I_C3"]>=2: msgs.append("Resistência em flexibilizar padrões (C3).")
    if m_neg["I_C5"]>=1: msgs.append("Resistência em reconhecer cansaço e limites (C5).")
    kneg = sum(1 for eid in ids_neg if safe_meta(eid).get("familia","")=="Kangaroo Paw")
    if kneg>=1: msgs.append("Resistência relacional ativa (escuta/presença).")
    return " ".join(msgs)

def _sequencia(m_pos, m_neg):
    if m_pos["IE"]>=2 or m_neg["I_C5"]>=1:
        return [
            "Cadencie sono e tarefas por 7 dias (ritmo > volume).",
            "Diário de energia: 2 anotações/dia sobre picos e quedas.",
            "Ajuste 1 hábito de recuperação rápida após esforço."
        ]
    if (m_pos["I_C2"]>=2 and m_pos["I_C3"]>=2) or (m_neg["I_C2"]>=2 and m_neg["I_C3"]>=1):
        return [
            "Nomeie 1 ferida recorrente que aciona defesa típica.",
            "Pratique um gesto de flexibilização quando o gatilho surgir.",
            "Registre um micro-avanço semanal de escolha consciente."
        ]
    return [
        "Defina uma intenção simples para os próximos 7 dias.",
        "Observe onde surge atrito e descreva o contexto.",
        "Escolha um ajuste pequeno e repita por 3 dias."
    ]

def _chakras_arquetipos_bloco(ids_pos, ids_neg):
    def collect(ids):
        ch, arq = {}, {}
        for eid in ids:
            m = safe_meta(eid)
            for c in m.get("chakras") or []:
                ch[c] = ch.get(c,0)+1
            for a in m.get("arquetipos") or []:
                arq[a] = arq.get(a,0)+1
        return ch, arq
    ch_pos, arq_pos = collect(ids_pos)
    ch_neg, arq_neg = collect(ids_neg)
    return {
        "chakras": {"positivos": ch_pos, "negativos": ch_neg},
        "arquetipos": {"positivos": arq_pos, "negativos": arq_neg},
    }

def _relatorio_texto(m_pos, m_neg, ids_pos, ids_neg):
    blocos = [
        _frase_direcao(m_pos, m_neg),
        _frase_gargalo(m_pos, m_neg),
        _frase_relacional(ids_pos),
        _frase_energia(m_pos, m_neg),
        _frase_cor(m_pos),
        _frase_chakra(m_pos),
        _frase_resistencia(m_neg, ids_neg),
    ]
    textoCliente = " ".join([b for b in blocos if b]).strip()
    textoPro = (
        f"Camadas(+): C1={m_pos['I_C1']} C2={m_pos['I_C2']} C3={m_pos['I_C3']} C4={m_pos['I_C4']} C5={m_pos['I_C5']}. "
        f"Rel={m_pos['IR']} Prof={m_pos['IP']} Ene={m_pos['IE']}. "
        f"Camadas(−): C1={m_neg['I_C1']} C2={m_neg['I_C2']} C3={m_neg['I_C3']} C4={m_neg['I_C4']} C5={m_neg['I_C5']}. "
        + textoCliente
    ).strip()
    return textoCliente, textoPro

def interpret_selection_string_v7(selection_str: str) -> dict:
    pairs = parse_selection(selection_str)
    top7 = pairs[:7]  # Final = Top 7
    ids_pos = [eid for eid, s in top7 if s>0]
    ids_neg = [eid for eid, s in top7 if s<0]
    m_pos = build_metrics(ids_pos)
    m_neg = build_metrics(ids_neg)
    textoCliente, textoPro = _relatorio_texto(m_pos, m_neg, ids_pos, ids_neg)
    seq = _sequencia(m_pos, m_neg)
    suportes = sugerir_suportes(ids_pos + ids_neg) or None
    extra = _chakras_arquetipos_bloco(ids_pos, ids_neg)
    return {
        "final": {
            "resumo": {"textoCliente": textoCliente, "textoPro": textoPro, "sequencia": seq, "suportes": suportes},
            "metricas": {"positivas": m_pos, "negativas": m_neg},
            "ids": {"positivas": ids_pos, "negativas": ids_neg},
            "chakras": extra["chakras"],
            "arquetipos": extra["arquetipos"],
        },
        "parsed": top7
    }

In [None]:

# === Texto completo (v7.1 se disponível; fallback heurístico v6/v7.2) ===
from collections import defaultdict, Counter

def _list_str(lst):
    return ", ".join(lst) if lst else "—"

def _fmt_camadas(cnt):
    out = []
    for c in ["C1","C2","C3","C4","C5"]:
        out.append(f"{c}={cnt.get(c,0)}")
    return ", ".join(out)

# Preparos para heurística fallback
pos_src = fin_pos_e if (fin_pos_e is not None and len(fin_pos_e)) else pre_pos_e
neg_src = pd.concat([pre_neg_e, fin_neg_e], ignore_index=True) if (pre_neg_e is not None or fin_neg_e is not None) else None

# Camadas
cam_pos_cnt = Counter(pos_src["camada"].dropna().astype(str)) if pos_src is not None and "camada" in pos_src.columns else Counter()
cam_neg_cnt = Counter(neg_src["camada"].dropna().astype(str)) if neg_src is not None and "camada" in neg_src.columns else Counter()

# Domínios (+) com fallback por chakra
def _infer_domain_row(row):
    dval = str(row.get("dominio","")).strip().lower()
    if dval:
        if dval.startswith("rel"): return "Rel"
        if dval.startswith("prof"): return "Prof"
        if dval.startswith("ene"): return "Ene"
    ch = str(row.get("chakra","")).strip().lower()
    if ch in {"ch4","4"}: return "Rel"
    if ch in {"ch3","3","ch6","6"}: return "Prof"
    if ch in {"ch1","1","ch5","5"}: return "Ene"
    return "Prof"

if pos_src is not None and len(pos_src):
    dom_series  = pos_src.apply(_infer_domain_row, axis=1)
    dom_counts  = Counter(dom_series)
else:
    dom_counts  = Counter()

def _color_line(df):
    if df is None or df.empty or "color" not in df.columns:
        return "Cores(+): —"
    bucket = defaultdict(list)
    for _, r in df.dropna(subset=["essence_name"]).iterrows():
        cor = str(r.get("color","")).strip().lower()
        if not cor: continue
        bucket[cor].append(str(r["essence_name"]))
    parts = []
    for cor, names in sorted(bucket.items()):
        uniq = sorted(set(names))
        parts.append(f"{cor}={len(uniq)} — {', '.join(uniq)}")
    return "Cores(+): " + "; ".join(parts) + "."

colors_line = _color_line(pos_src)

def _counts_map(df, key_col, name_col):
    m = defaultdict(list)
    if df is None or df.empty: return m
    g = df.dropna(subset=[key_col])[ [key_col, name_col] ].astype(str).groupby(key_col)
    for k, sub in g:
        m[str(k)] = sorted(sub[name_col].unique().tolist())
    return m

chak_pos_map = _counts_map(pos_src, "chakra", "essence_name") if pos_src is not None else {}
chak_neg_map = _counts_map(neg_src, "chakra", "essence_name") if neg_src is not None else {}
chak_pos_line = "; ".join([f"{k}×{len(v)} — {', '.join(v)}" for k,v in chak_pos_map.items()]) if chak_pos_map else "—"
chak_neg_line = "; ".join([f"{k}×{len(v)} — {', '.join(v)}" for k,v in chak_neg_map.items()]) if chak_neg_map else "—"
chak_pos_cnt = {k: len(v) for k,v in chak_pos_map.items()}
chak_neg_cnt = {k: len(v) for k,v in chak_neg_map.items()}
chakra_hotspot_pos = max(chak_pos_cnt, key=chak_pos_cnt.get) if chak_pos_cnt else None
chakra_hotspot_neg = max(chak_neg_cnt, key=chak_neg_cnt.get) if chak_neg_cnt else None
hotspot_line  = "Hotspot(+): " + (str(chakra_hotspot_pos) if chakra_hotspot_pos else "—")
if chakra_hotspot_neg: hotspot_line += f"; Hotspot(−): {chakra_hotspot_neg}"

def _sem_arq(df):
    if df is None or df.empty: return []
    m = df[df["arquetipo"].isna() | (df["arquetipo"].astype(str).str.strip()=="")]
    return sorted(m["essence_name"].dropna().astype(str).unique().tolist())

arq_pos_map  = _counts_map(pos_src, "arquetipo", "essence_name") if pos_src is not None else {}
arq_neg_map  = _counts_map(neg_src, "arquetipo", "essence_name") if neg_src is not None else {}
arq_pos_sem = _sem_arq(pos_src); arq_neg_sem = _sem_arq(neg_src)

# Gargalo simples
pre_pos_ct = len(pre_pos); kept = len(set(pre_pos["essence_id"]) & set(final_pos["essence_id"]))
retencao = round(100*kept/max(1, pre_pos_ct), 2) if pre_pos_ct else None
c5n, c3n, c4n = cam_neg_cnt.get("C5",0), cam_neg_cnt.get("C3",0), cam_neg_cnt.get("C4",0)
gargalo = None
if c5n >= 2 or (retencao is not None and retencao < 50):
    gargalo = "energia primeiro"
elif c3n >= 2:
    gargalo = "foco cognitivo primeiro"
elif (c4n >= 1) and (chakra_hotspot_pos in {"Ch4","4"}):
    gargalo = "relacional primeiro"

seq = []
if   gargalo == "energia primeiro":
    seq = ["Cadenciar sono e tarefas por 7 dias (ritmo > volume).","Diário de energia 2×/dia: pico/queda + contexto.","Um hábito rápido de recuperação após esforço."]
elif gargalo == "foco cognitivo primeiro":
    seq = ["Intenção única para 7 dias.","Registrar atrito cognitivo e contexto.","Fechar um tema antes de abrir outro."]
elif gargalo == "relacional primeiro":
    seq = ["Micro-acordos de comunicação com 1 pessoa-chave.","Check-in breve diário focado em impedimentos.","Feedback curto ao final da semana."]
else:
    seq = ["Cadenciar sono e tarefas por 7 dias (ritmo > volume).","Diário de energia 2×/dia: pico/queda + contexto.","Escolher 1 tema e concluir antes de abrir outro."]
seq_lines = "\n".join([f"- {s}" for s in seq])

def suggest_supports():
    if pos_src is None or pos_src.empty: return []
    if chakra_hotspot_pos and "chakra" in pos_src.columns:
        f = pos_src[pos_src["chakra"].astype(str).str.lower() == str(chakra_hotspot_pos).lower()]
        if len(f) == 0: f = pos_src
    else:
        f = pos_src
    return sorted(f["essence_name"].dropna().astype(str).unique().tolist())[:2]
suportes = suggest_supports()

camadas_pos_str = _fmt_camadas(cam_pos_cnt)
camadas_neg_str = _fmt_camadas(cam_neg_cnt) if sum(cam_neg_cnt.values())>0 else "C1=0, C2=0, C3=0, C4=0, C5=0"
gargalo_line = f"Gargalo: {gargalo}." if gargalo else "Gargalo: —."
dom_line = f"Domínios(+): Rel={dom_counts.get('Rel',0)} Prof={dom_counts.get('Prof',0)} Ene={dom_counts.get('Ene',0)}."

# Heurístico base
texto_heuristico = (
f"Cliente\nPreparação suficiente para trabalhar o tema.\n\n"
f"Profissional\nCamadas(+): {camadas_pos_str}.\n{dom_line}\nCores(+): {colors_line}\n"
f"Camadas(−): {camadas_neg_str}.\n{gargalo_line}\n\n"
f"Chakras\nAtivos (+): {chak_pos_line}\nResistência (−): {chak_neg_line}\n{hotspot_line}\n\n"
f"Arquétipos\nAtivos (+): " + ("; ".join([f"{k}×{len(v)} — {', '.join(v)}" for k,v in arq_pos_map.items()]) if arq_pos_map else "—") + "\n"
+ f"Resistência (−): " + ("; ".join([f"{k}×{len(v)} — {', '.join(v)}" for k,v in arq_neg_map.items()]) if arq_neg_map else "—") + "\n"
+ f"Sem arquétipo mapeado: " + _list_str(sorted(set(arq_pos_sem + arq_neg_sem))) + "\n\n"
+ "Sequência\n" + seq_lines + "\n\n"
+ "Suportes\n" + (_list_str(suportes) if suportes else "Sem obrigatório; se desejar, 1 essência do final como guia.")
)

# v7.1 path: usar helper interpret_selection_string_v7 se existir
texto_final = None
if TEXT_MODE == "v7_1" or "interpret_selection_string_v7" in globals():
    try:
        sel = (fin.iloc[0]["selection"] if len(fin) else (pre.iloc[0]["selection"] if len(pre) else ""))
        if sel:
            out = interpret_selection_string_v7(sel)  # esperado na v7.1
            # aceitar múltiplos formatos de retorno
            if isinstance(out, str):
                texto_final = out
            elif isinstance(out, dict):
                # tentar o esquema final.resumo
                try:
                    resumo = out.get("final", out).get("resumo", out.get("resumo", {}))
                    tc = resumo.get("textoCliente","")
                    tp = resumo.get("textoPro","")
                    seqL = resumo.get("sequencia", [])
                    supL = resumo.get("suportes", [])
                    texto_final = (
                        "Cliente\n" + (tc or "—") + "\n\n"
                        "Profissional\n" + (tp or "—") + "\n\n"
                        "Sequência\n" + ("\n".join([f"- {s}" for s in seqL]) if seqL else "—") + "\n\n"
                        "Suportes\n" + (", ".join(supL) if supL else "—")
                    )
                except Exception:
                    # tentar chave 'texto'
                    texto_final = str(out.get("texto",""))
    except Exception as e:
        texto_final = None  # cai no heurístico

if not texto_final:
    # usar heurístico completo
    texto_final = texto_heuristico

text_df = pd.DataFrame([{"texto": texto_final}])


In [None]:

# === Auxiliares (temporal/produto/resumo) ===
d2 = raw[raw["session_effective"]==SESS].copy()
if "timestamp_local" in d2.columns: d2 = d2.sort_values("timestamp_local")

temporal_cols = ["timestamp_local","kind","subject","action","triggerKind","option","price","currency","payment_status","event_id","event_id_linked","hp","locale","ip_address","referer"]
REL_temporal = d2[[c for c in temporal_cols if c in d2.columns]].copy()

prod_cols = ["action","triggerKind","option","price","currency","payment_status","product_id","stripe_customer_id","stripe_payment_intent_id"]
if len(d2): REL_produto = d2.tail(1)[[c for c in prod_cols if c in d2.columns]].copy()
else:       REL_produto = pd.DataFrame(columns=prod_cols)

kept = len(set(pre_pos["essence_id"]) & set(final_pos["essence_id"]))
new_final = len(set(final_pos["essence_id"]) - set(pre_pos["essence_id"]))
drop = len(set(pre_pos["essence_id"]) - set(final_pos["essence_id"]))
conv_neg_to_pos = len(set(pre_neg["essence_id"]) & set(final_pos["essence_id"]))
persist_neg = len(set(pre_neg["essence_id"]) & set(final_neg["essence_id"]))
REL_resumo = pd.DataFrame([
    {"metric":"pre_pos","value":len(pre_pos)},
    {"metric":"pre_neg","value":len(pre_neg)},
    {"metric":"final_pos","value":len(final_pos)},
    {"metric":"final_neg","value":len(final_neg)},
    {"metric":"retencao_positivos","value": round(100*kept/max(1, len(pre_pos)),2) if len(pre_pos)>0 else None},
    {"metric":"novos_positivos","value": new_final},
    {"metric":"+_para_-","value": drop},
    {"metric":"-_para_+","value": conv_neg_to_pos},
    {"metric":"negativos_persistentes","value": persist_neg},
])


In [None]:

# === Gravação ===
# Sempre grava texto e detalhes
write_tab(ss_out, TAB_TEXT, text_df)
write_tab(ss_out, TAB_DET_ALL, det_all)

if not TEXT_ONLY:
    write_tab(ss_out, TAB_DET_PRE_POS, pre_pos_e)
    write_tab(ss_out, TAB_DET_PRE_NEG, pre_neg_e)
    write_tab(ss_out, TAB_DET_FIN_POS, fin_pos_e)
    write_tab(ss_out, TAB_DET_FIN_NEG, fin_neg_e)

    write_tab(ss_out, TAB_OVERVIEW, overview)
    write_tab(ss_out, TAB_PRE_POS, pre_pos)
    write_tab(ss_out, TAB_PRE_NEG, pre_neg)
    write_tab(ss_out, TAB_FINAL_POS, final_pos)
    write_tab(ss_out, TAB_FINAL_NEG, final_neg)
    write_tab(ss_out, TAB_MOVEMENT, movement)
    write_tab(ss_out, TAB_COMPARATIVO, comparativo)
    write_tab(ss_out, TAB_RESUMO, REL_resumo)
    write_tab(ss_out, TAB_TEMPORAL, REL_temporal)
    write_tab(ss_out, TAB_PRODUTO, REL_produto)
    write_tab(ss_out, TAB_ATTRS, REL_ATTRS)
    write_tab(ss_out, TAB_ATTRS_STAGE, REL_ATTRS_STAGE)

print("Concluído v7.3.")
