#**Licença de Uso**


This repository uses a **dual-license model** to distinguish between source code and creative/documental content.

**Code** (Python scripts, modules, utilities):
Licensed under the MIT License.

→ You may freely use, modify, and redistribute the code, including for commercial purposes, provided that you preserve the copyright notice.

**Content** (Jupyter notebooks, documentation, reports, datasets, and generated outputs):
Licensed under the Creative Commons Attribution–NonCommercial 4.0 International License.

→ You may share and adapt the content for non-commercial purposes, provided that proper credit is given to the original author.


**© 2025 Leandro Bernardo Rodrigues**


#**Pré-Configuração**

##**Código de uso único**
Aplicação persistente entre sessões do Google Colab

---
**Uso expecífico para Google Colab.**

**Aviso:** implementação no JupytherHub e GitLab são diferentes.

In [None]:
# @title
#montar o Google Drive e preparar a pasta do projeto
from google.colab import drive
drive.mount('/content/drive', force_remount=True)
import os, subprocess, getpass, pathlib
BASE = "/content/drive/MyDrive/Notebooks"
REPO = "temporal-graph-network"
PROJ = f"{BASE}/{REPO}"
os.makedirs(BASE, exist_ok=True)
%cd "$BASE"

#clonar o repositório existente do GitHub (se ainda não estiver clonado) ===
if not os.path.exists(PROJ):
    GITHUB_URL = "https://github.com/LeoBR84p/temporal-graph-network.git"
    # Dica: use PAT quando o push for necessário; para clone público basta a URL.
    !git clone $GITHUB_URL
else:
    print("Pasta do projeto já existe, seguindo adiante...")
%cd "$PROJ"

#criar pastas utilitárias que você quer manter no projeto ===
#não sobrescreve nada; só cria se não existirem
!mkdir -p notebooks src data output runs configs

#instalar pacotes (na sessão atual) para conseguir configurar os filtros ===
!pip -q install jupytext nbdime nbstripout

#configurar Git/NBDime/Jupytext no *repositório* (persistem em .git/config) ===
#usar --local faz a config ficar gravada em .git/config (persiste no Drive)
!git config --local user.name "Leandro Bernardo Rodrigues"
!git config --local user.email "bernardo.leandro@gmail.com"
!git config --local init.defaultBranch main

# OBS: no Colab, o nbdime com --local pode falhar; use --global nesta sessão
!nbdime config-git --enable --global

#.gitignore e .gitattributes (só criar se não existirem) ===
if not pathlib.Path(".gitignore").exists():
    with open(".gitignore","w") as f:
        f.write("""\
.ipynb_checkpoints/
.DS_Store
Thumbs.db
*.log
*.tmp
# dados/artefatos pesados (não versionar)
data/
output/
runs/
# Python
venv/
__pycache__/
*.pyc
# segredos
.env
*.key
*.pem
*.tok
""")
if not pathlib.Path(".gitattributes").exists():
    with open(".gitattributes","w") as f:
        f.write("*.ipynb filter=nbstripout\n")

#ativar o hook do nbstripout neste repositório (persiste)
!nbstripout --install --attributes .gitattributes

#parear notebooks com .py para diffs legíveis ===
!jupytext --set-formats ipynb,py:percent --sync notebooks/*.ipynb || true

#commit inicial dessas configs locais (se houver algo novo) e push ===
!git add -A
!git status
!git commit -m "chore: setup local (.gitignore/.gitattributes, nbstripout, jupytext config)" || true

#se o remoto já tem README/commits, faça pull --rebase antes do primeiro push
!git pull --rebase origin main || true

#push (ao pedir senha, use seu PAT como senha do Git)
import getpass, subprocess, sys

owner = "LeoBR84p"
repo  = "temporal-graph-network"
clean_url = f"https://github.com/{owner}/{repo}.git"

# 1) Tenta push "normal" (pode falhar por falta de credencial)
push = subprocess.run(["git","push","origin","main"], capture_output=True, text=True)
if push.returncode == 0:
    print("Push concluído sem PAT.")
else:
    print("Primeiro push falhou (provável falta de credenciais). Vamos usar um PAT temporário…")
    # 2) Pede o PAT e testa autenticação antes do push
    token = getpass.getpass("Cole seu GitHub PAT (não será exibido): ").strip()
    # Formato mais compatível: user + token na URL
    # Use seu usuário real do GitHub (case sensitive)
    username = "LeoBR84p"
    auth_url = f"https://{username}:{token}@github.com/{owner}/{repo}.git"

    try:
        # Teste rápido de auth (ls-remote) para ver se o token tem acesso de escrita
        test = subprocess.run(["git","ls-remote", auth_url],
                              capture_output=True, text=True)
        if test.returncode != 0:
            print("Falha ao autenticar com o PAT. Detalhe do erro:")
            print(test.stderr or test.stdout)
            raise SystemExit(1)

        # 3) Troca a URL, faz push e restaura a URL limpa
        subprocess.run(["git","remote","set-url","origin", auth_url], check=True)
        out = subprocess.run(["git","push","origin","main"], capture_output=True, text=True)
        if out.returncode != 0:
            print("Falha no push mesmo com PAT. Detalhe do erro:")
            print(out.stderr or out.stdout)
            raise SystemExit(out.returncode)
        print("Push concluído com PAT.")
    finally:
        subprocess.run(["git","remote","set-url","origin", clean_url], check=False)

##**Código a cada sessão**
---
Aplicação não persistente entre sessões.

Necessário para sincronização e versionamento de alterações no código.

###**Montar e sincronizar**

In [None]:
# @title
#setup por sessão (colab)
from google.colab import drive
import os, time, subprocess, getpass, pathlib, sys

BASE = "/content/drive/MyDrive/Notebooks"
REPO = "temporal-graph-network"
PROJ = f"{BASE}/{REPO}"

def safe_mount_google_drive():
    #monta ou remonta o google drive de forma resiliente
    try:
        drive.mount('/content/drive', force_remount=True)
    except Exception:
        try:
            drive.flush_and_unmount()
        except Exception:
            pass
        time.sleep(1.0)
        drive.mount('/content/drive', force_remount=True)

def safe_chdir(path):
    #usa os.chdir (evita %cd com f-string)
    if not os.path.exists(path):
        raise FileNotFoundError(f"Caminho não existe: {path}")
    os.chdir(path)
    print("Diretório atual:", os.getcwd())

def branch_a_frente():
    #retorna true se head está à frente do upstream (há o que enviar)
    ahead = subprocess.run(
        ["git","rev-list","--left-right","--count","HEAD...@{upstream}"],
        capture_output=True, text=True
    )
    if ahead.returncode != 0:
        status = subprocess.run(["git","status","-sb"], capture_output=True, text=True)
        return "ahead" in (status.stdout or "")
    left_right = (ahead.stdout or "").strip().split()
    return len(left_right) == 2 and left_right[0].isdigit() and int(left_right[0]) > 0

def push_seguro(owner="LeoBR84p", repo="temporal-graph-network", username="LeoBR84p"):
    #realiza push usando pat em memória; restaura url limpa ao final
    clean_url = f"https://github.com/{owner}/{repo}.git"
    token = getpass.getpass("Cole seu GitHub PAT (Contents: Read and write): ").strip()
    auth_url = f"https://{username}:{token}@github.com/{owner}/{repo}.git"
    test = subprocess.run(["git","ls-remote", auth_url], capture_output=True, text=True)
    if test.returncode != 0:
        print("Falha na autenticação (read). Revise token/permissões:")
        print(test.stderr or test.stdout); return
    try:
        subprocess.run(["git","remote","set-url","origin", auth_url], check=True)
        out = subprocess.run(["git","push","origin","main"], capture_output=True, text=True)
        if out.returncode != 0:
            print("Falha no push (write). Revise permissões do token:")
            print(out.stderr or out.stdout)
        else:
            print("Push concluído com PAT.")
    finally:
        subprocess.run(["git","remote","set-url","origin", clean_url], check=False)

#montar/remontar o google drive e entrar no projeto
safe_mount_google_drive()
os.makedirs(BASE, exist_ok=True)
if not os.path.exists(PROJ):
    print(f"Atenção: pasta do projeto não encontrada em {PROJ}. "
          "Execute seu bloco de configuração única (clone) primeiro.")
else:
    print("Pasta do projeto encontrada.")
safe_chdir(PROJ)

#sanity check do repositório git
if not os.path.isdir(".git"):
    print("Aviso: esta pasta não parece ser um repositório Git (.git ausente). "
          "Rode o bloco de configuração única.")
else:
    print("Repositório Git detectado.")

#instalar dependências efêmeras desta sessão
!pip -q install jupytext nbdime nbstripout
!nbdime config-git --enable --global

#atualizar do remoto
!git fetch origin
!git pull --rebase origin main

#sincronizar notebooks → .py (jupytext)
!jupytext --sync notebooks/*.ipynb || true

#ciclo de versionamento do dia (commit genérico opcional)
!git add -A
!git status
!git commit -m "feat: ajustes no notebook X e pipeline Y" || true

#push somente se houver commits locais à frente; com fallback para pat
if branch_a_frente():
    out = subprocess.run(["git","push","origin","main"], capture_output=True, text=True)
    if out.returncode == 0:
        print("Push concluído sem PAT.")
    else:
        print("Push sem PAT falhou. Chamando push_seguro()…")
        push_seguro()
else:
    print("Nada para enviar (branch sincronizada com o remoto).")

###**Utilitários Git**

In [None]:
# @title
#helpers de git: push seguro, commit customizado e tag de release
import subprocess, getpass

#ajuste se você mudar o nome do repositório/usuário
OWNER = "LeoBR84p"
REPO = "temporal-graph-network"
USERNAME = "LeoBR84p"
BRANCH = "main"
REMOTE = "origin"

def branch_a_frente():
    #retorna true se head está à frente do upstream (há o que enviar)
    out = subprocess.run(
        ["git","rev-list","--left-right","--count",f"HEAD...@{{upstream}}"],
        capture_output=True, text=True
    )
    if out.returncode != 0:
        st = subprocess.run(["git","status","-sb"], capture_output=True, text=True)
        return "ahead" in (st.stdout or "")
    left_right = (out.stdout or "").strip().split()
    return len(left_right) == 2 and left_right[0].isdigit() and int(left_right[0]) > 0

def push_seguro(owner=OWNER, repo=REPO, username=USERNAME, remote=REMOTE, branch=BRANCH):
    #realiza push usando pat em memória; restaura url limpa ao final
    clean_url = f"https://github.com/{owner}/{repo}.git"
    token = getpass.getpass("cole seu github pat (contents: read and write): ").strip()
    auth_url = f"https://{username}:{token}@github.com/{owner}/{repo}.git"
    test = subprocess.run(["git","ls-remote", auth_url], capture_output=True, text=True)
    if test.returncode != 0:
        print("falha na autenticação (read). revise token/permissões:")
        print(test.stderr or test.stdout); return False
    try:
        subprocess.run(["git","remote","set-url", remote, auth_url], check=True)
        out = subprocess.run(["git","push", remote, branch], capture_output=True, text=True)
        if out.returncode != 0:
            print("falha no push (write). revise permissões do token:")
            print(out.stderr or out.stdout); return False
        print("push concluído com pat.")
        return True
    finally:
        subprocess.run(["git","remote","set-url", remote, clean_url], check=False)

def try_push_branch(remote=REMOTE, branch=BRANCH):
    #tenta push direto da branch atual
    out = subprocess.run(["git","push", remote, branch], capture_output=True, text=True)
    if out.returncode == 0:
        print("push concluído.")
        return True
    print("push sem credencial falhou:")
    print((out.stderr or out.stdout).strip())
    return False

def try_push_tag(tag, remote=REMOTE):
    #tenta enviar somente a tag
    out = subprocess.run(["git","push", remote, tag], capture_output=True, text=True)
    if out.returncode == 0:
        print(f"tag enviada: {tag}")
        return True
    print("falha ao enviar a tag:")
    print((out.stderr or out.stdout).strip())
    return False

def commit_custom(msg: str, auto_push: bool = True):
    #adiciona tudo, cria commit com a mensagem informada e faz push opcional (com fallback para pat)
    subprocess.run(["git","add","-A"], check=False)
    com = subprocess.run(["git","commit","-m", msg], capture_output=True, text=True)
    if com.returncode != 0:
        print((com.stderr or com.stdout or "nada para commitar.").strip())
        return
    print(com.stdout.strip())
    if auto_push and branch_a_frente():
        if not try_push_branch():
            print("tentando push seguro…")
            push_seguro()

def tag_release(tag: str, message: str = "", auto_push: bool = True):
    #cria uma tag anotada (release) e faz push da tag com fallback para pat
    exists = subprocess.run(["git","tag","--list", tag], capture_output=True, text=True)
    if tag in (exists.stdout or "").split():
        print(f"tag '{tag}' já existe. para refazer: git tag -d {tag} && git push {REMOTE} :refs/tags/{tag}")
        return
    args = ["git","tag","-a", tag, "-m", (message or tag)]
    mk = subprocess.run(args, capture_output=True, text=True)
    if mk.returncode != 0:
        print("falha ao criar a tag:")
        print(mk.stderr or mk.stdout); return
    print(f"tag criada: {tag}")
    if auto_push:
        if not try_push_tag(tag):
            print("tentando push seguro da tag…")
            if push_seguro():
                try_push_tag(tag)

#**Sincronizar alterações no código do projeto**
Comandos para sincronizar código (Google Drive, Git, GitHub) e realizar versionamento

Tag de release atual: 1.0

In [None]:
# @title
#ID0002
#push do Drive -> GitHub (Drive é a fonte da verdade)
#respeita .gitignore do Drive
#sempre em 'main', sem pull, commit + push imediato
#mensagem de commit padronizada com timestamp SP
#bump de versão (M/m/n) + tag anotada
#force push (branch e tags), silencioso; só 1 print final
#PAT lido de segredo do Colab: GITHUB_PAT_DA (fallback: env; último caso: prompt)

from pathlib import Path
import subprocess, os, re, shutil, sys, getpass
from urllib.parse import quote as urlquote
from datetime import datetime, timezone, timedelta

#utilitários silenciosos
def sh(cmd, cwd=None, check=True):
    """
    Executa comando silencioso. Em erro, levanta RuntimeError com rc e UM rascunho de causa,
    mascarando URLs com credenciais (ex.: https://***:***@github.com/...).
    """
    safe_cmd = []
    for x in cmd:
        if isinstance(x, str) and "github.com" in x and "@" in x:
            #mascara credenciais: https://user:token@ -> https://***:***@
            x = re.sub(r"https://[^:/]+:[^@]+@", "https://***:***@", x)
        safe_cmd.append(x)

    r = subprocess.run(cmd, cwd=cwd, text=True, capture_output=True)
    if check and r.returncode != 0:
        #heurística curtinha p/ tornar rc=128 mais informativo sem vazar nada
        stderr = (r.stderr or "").strip().lower()
        if "authentication failed" in stderr or "permission" in stderr or "not found" in stderr:
            hint = "auth/permissões/URL"
        elif "not a git repository" in stderr:
            hint = "repo local inválido"
        else:
            hint = "git falhou"
        cmd_hint = " ".join(safe_cmd[:3])
        raise RuntimeError(f"rc={r.returncode}; {hint}; cmd={cmd_hint}")
    return r.stdout

def git(*args, cwd=None, check=True):
    return sh(["git", *args], cwd=cwd, check=check)

#configurações do projeto
author_name    = "Leandro Bernardo Rodrigues"
owner          = "LeoBR84p"         # dono do repositório no GitHub
repo_name      = "data-analysis"    # nome do repositório
default_branch = "main"
repo_dir       = Path("/content/drive/MyDrive/Notebooks/temporal-graph-network")
remote_base    = f"https://github.com/{owner}/{repo_name}.git"
author_email   = f"bernardo.leandro@gmail.com"  # evita erro de identidade

#nbstripout: "install" para limpar outputs; "disable" para versionar outputs
nbstripout_mode = "install"
import shutil
exe = shutil.which("nbstripout")
git("config", "--local", "filter.nbstripout.clean", exe if exe else "nbstripout", cwd=repo_dir)

#ambiente: Colab + Drive
def ensure_drive():
    try:
        from google.colab import drive  # type: ignore
        base = Path("/content/drive/MyDrive")
        if not base.exists():
            drive.mount("/content/drive")
        if not base.exists():
            raise RuntimeError("Google Drive não montado.")
    except Exception as e:
        raise RuntimeError(f"Falha ao montar o Drive: {e}")

#repo local no Drive
def is_empty_dir(p: Path) -> bool:
    try:
        return p.exists() and not any(p.iterdir())
    except Exception:
        return True

def init_or_recover_repo():
    repo_dir.mkdir(parents=True, exist_ok=True)
    git_dir = repo_dir / ".git"

    def _fresh_init():
        if git_dir.exists():
            shutil.rmtree(git_dir, ignore_errors=True)
        git("init", cwd=repo_dir)

    #caso .git no Colab ausente ou vazia -> init limpo
    if not git_dir.exists() or is_empty_dir(git_dir):
        _fresh_init()
    else:
        #valida se é um work-tree git funcional no Colab; se falhar -> init limpo
        try:
            git("rev-parse", "--is-inside-work-tree", cwd=repo_dir)
        except Exception:
            _fresh_init()

    #aborta operações pendentes (não apaga histórico)
    for args in (("rebase", "--abort"), ("merge", "--abort"), ("cherry-pick", "--abort")):
        try:
            git(*args, cwd=repo_dir, check=False)
        except Exception:
            pass

    #força branch main
    try:
        sh(["git", "switch", "-C", default_branch], cwd=repo_dir)
    except Exception:
        sh(["git", "checkout", "-B", default_branch], cwd=repo_dir)

    #configura identidade local
    try:
        git("config", "user.name", author_name, cwd=repo_dir)
        git("config", "user.email", author_email, cwd=repo_dir)
    except Exception:
        pass

    #marca o diretório como safe
    try:
        sh(["git","config","--global","--add","safe.directory", str(repo_dir)])
    except Exception:
        pass

    #sanity check final (falha cedo se algo ainda estiver errado)
    git("status", "--porcelain", cwd=repo_dir)


#nbstripout (opcional)
def setup_nbstripout():
    if nbstripout_mode == "disable":
        #remove configs do filtro
        sh(["git","config","--local","--unset-all","filter.nbstripout.clean"], cwd=repo_dir, check=False)
        sh(["git","config","--local","--unset-all","filter.nbstripout.smudge"], cwd=repo_dir, check=False)
        sh(["git","config","--local","--unset-all","filter.nbstripout.required"], cwd=repo_dir, check=False)
        gat = repo_dir / ".gitattributes"
        if gat.exists():
            lines = gat.read_text(encoding="utf-8", errors="ignore").splitlines()
            new_lines = [ln for ln in lines if "filter=nbstripout" not in ln]
            gat.write_text("\n".join(new_lines) + ("\n" if new_lines else ""), encoding="utf-8")
        return

    #instala nbstripout (se necessário)
    try:
        import nbstripout  #noqa: F401
    except Exception:
        sh([sys.executable, "-m", "pip", "install", "--quiet", "nbstripout"])

    py = sys.executable
    #configurar filtro sem aspas extras
    git("config", "--local", "filter.nbstripout.clean", "nbstripout", cwd=repo_dir)
    git("config", "--local", "filter.nbstripout.smudge", "cat", cwd=repo_dir)
    git("config", "--local", "filter.nbstripout.required", "true", cwd=repo_dir)
    gat = repo_dir / ".gitattributes"
    line = "*.ipynb filter=nbstripout"
    if gat.exists():
        txt = gat.read_text(encoding="utf-8", errors="ignore")
        if line not in txt:
            gat.write_text((txt.rstrip() + "\n" + line + "\n"), encoding="utf-8")
    else:
        gat.write_text(line + "\n", encoding="utf-8")

#.gitignore normalização
def normalize_tracked_ignored():
    """
    Se houver arquivos já rastreados que hoje são ignorados pelo .gitignore,
    limpa o índice e re-adiciona respeitando o .gitignore.
    Retorna True se normalizou algo; False caso contrário.
    """
    #remove lock de índice, se houver
    lock = repo_dir / ".git/index.lock"
    try:
        if lock.exists():
            lock.unlink()
    except Exception:
        pass

    #garante que o índice existe (ou se recupera)
    idx = repo_dir / ".git/index"
    if not idx.exists():
        try:
            sh(["git", "reset", "--mixed"], cwd=repo_dir)
        except Exception:
            try:
                sh(["git", "init"], cwd=repo_dir)
            except Exception:
                pass

    #detecta arquivos ignorados que estão rastreados e normaliza
    normalized = False
    try:
        out = git("ls-files", "-z", "--ignored", "--exclude-standard", "--cached", cwd=repo_dir)
        tracked_ignored = [p for p in out.split("\x00") if p]
        if tracked_ignored:
            git("rm", "-r", "--cached", ".", cwd=repo_dir)
            git("add", "-A", cwd=repo_dir)
            normalized = True
    except Exception:
        #falhou a detecção? segue o fluxo sem travar
        pass

    return normalized

#semVer e bump de versão
_semver = re.compile(r"^(\d+)\.(\d+)\.(\d+)$")

def parse_semver(s):
    m = _semver.match((s or "").strip())
    return tuple(map(int, m.groups())) if m else None

def current_version():
    try:
        tags = [t for t in git("tag", "--list", cwd=repo_dir).splitlines() if parse_semver(t)]
        if tags:
            return sorted(tags, key=lambda x: parse_semver(x))[-1]
    except Exception:
        pass
    vf = repo_dir / "VERSION"
    if vf.exists():
        v = vf.read_text(encoding="utf-8").strip()
        if parse_semver(v):
            return v
    return "1.0.0"

def bump(v, kind):
    M, m, p = parse_semver(v) or (1, 0, 0)
    k = (kind or "").strip()
    if k == "m":
        return f"{M}.{m+1}.0"
    if k == "n":
        return f"{M}.{m}.{p+1}"
    return f"{M+1}.0.0"  #default major

#timestamp SP
def now_sp():
    #tenta usar zoneinfo; fallback fixo -03:00 (Brasil sem DST atualmente)
    try:
        from zoneinfo import ZoneInfo  # Py3.9+
        tz = ZoneInfo("America/Sao_Paulo")
        dt = datetime.now(tz)
    except Exception:
        dt = datetime.now(timezone(timedelta(hours=-3)))
    #formato legível + offset
    return dt.strftime("%Y-%m-%d %H:%M:%S%z")  # ex.: 2025-10-08 02:34:00-0300

#autenticação (PAT)
def get_pat():
    #Colab Secrets
    token = None
    try:
        from google.colab import userdata  #type: ignore
        token = userdata.get('GITHUB_PAT_DA')  #nome do segredo criado no Colab
    except Exception:
        token = None
    #fallback1 - variável de ambiente
    if not token:
        token = os.environ.get("GITHUB_PAT_DA") or os.environ.get("GITHUB_PAT")
    #fallback2 - interativo
    if not token:
        token = getpass.getpass("Informe seu GitHub PAT: ").strip()
    if not token:
        raise RuntimeError("PAT ausente.")
    return token

#listas de força
FORCE_UNTRACK = ["input/", "output/", "data/", "runs/", "logs/", "figures/"]
FORCE_TRACK   = ["references/"]  #versionar tudo dentro (PDFs inclusive)

def force_index_rules():
    #garante que pastas sensíveis NUNCA fiquem rastreadas
    for p in FORCE_UNTRACK:
        try:
            git("rm", "-r", "--cached", "--", p, cwd=repo_dir)
        except Exception:
            pass
    #garante que references/ SEMPRE entre (útil se ainda há *.pdf globais)
    for p in FORCE_TRACK:
        try:
            git("add", "-f", "--", p, cwd=repo_dir)
        except Exception:
            pass

#fluxo principal
def main():
    try:
        ensure_drive()
        init_or_recover_repo()
        setup_nbstripout()

        #pergunta apenas o tipo de versão (M/m/n)
        kind = input("Informe o tipo de mudança: Maior (M), menor (m) ou pontual (n): ").strip()
        if kind not in ("M", "m", "n"):
            kind = "n"

        #versão
        cur = current_version()
        new = bump(cur, kind)
        (repo_dir / "VERSION").write_text(new + "\n", encoding="utf-8")

        #normaliza itens ignorados que estejam rastreados (uma única vez, se necessário)
        normalize_tracked_ignored()

        #aplica regras de força
        force_index_rules()

        #stage de tudo (Drive é a verdade; remoções entram aqui)
        git("add", "-A", cwd=repo_dir)

        #mensagem padronizada de commit
        ts = now_sp()
        commit_msg = f"upload pelo {author_name} em {ts}"
        try:
            git("commit", "-m", commit_msg, cwd=repo_dir)
        except Exception:
            #se nada a commitar, seguimos (pode ocorrer se só a tag mudar, mas aqui VERSION muda)
            status = git("status", "--porcelain", cwd=repo_dir)
            if status.strip():
                raise

        #Tag anotada (substitui se já existir)
        try:
            git("tag", "-a", new, "-m", f"release {new} — {commit_msg}", cwd=repo_dir)
        except Exception:
            sh(["git", "tag", "-d", new], cwd=repo_dir, check=False)
            git("tag", "-a", new, "-m", f"release {new} — {commit_msg}", cwd=repo_dir)

        #push com PAT (Drive é a verdade): validação + push forçado
        token = get_pat()
        user_for_url = owner  # você é o owner; não perguntamos
        auth_url = f"https://{urlquote(user_for_url, safe='')}:{urlquote(token, safe='')}@github.com/{owner}/{repo_name}.git"

        #valida credenciais/URL de forma silenciosa (sem vazar token)
        #tenta checar a branch main; se não existir (repo vazio), faz um probe genérico
        try:
            sh(["git", "ls-remote", auth_url, f"refs/heads/{default_branch}"], cwd=repo_dir)
        except RuntimeError:
            #repositório pode estar vazio (sem refs); probe sem ref deve funcionar
            sh(["git", "ls-remote", auth_url], cwd=repo_dir)

        #push forçado de branch e tags
        sh(["git", "push", "-u", "--force", auth_url, default_branch], cwd=repo_dir)
        sh(["git", "push", "--force", auth_url, "--tags"], cwd=repo_dir)

        print(f"[ok]   Registro no GitHub com sucesso. Versão atual {new}")
    except Exception as e:
        #mensagem única, curta, sem detalhes sensíveis
        msg = str(e) or "falha inesperada"
        print(f"[erro] {msg}")

#executa
if __name__ == "__main__":
    main()

#**Checklist rápido de execução**
**Etapas:**
- 01–05: setup (ambiente, dependências, diretórios, configs e upload de CSVs)
- 06–10: execução (consumo dos dados, criação de grafos, config das janelas temporais, agregação de infos aos grafos, config dos modelos matemáticos)
- 11-15: geração de output (salva análise, gera gráficos gerais, gera gráficos específicos e relatórios em HTML+PDF)

#**Temporal Graph Network / Rede de Grapho Temporal**

Uma **Rede de Graphos Temporais (TGN)** é um modelo de aprendizado de máquina que processa dados representados como um grafo dinâmico. Ela captura a evolução da estrutura e das conexões de entidades (nós) ao longo do tempo. **Ou seja, ela leva em consideração o comportamento temporal das atividades e seu relacionamento, ao invés de uma avaliação única e estanque no tempo.**
_____

**Caso aplicado: Detecção de anomalias sem gabarito (sem dados históricos)**

Imagine uma rede de transações financeiras. A TGN analisa o histórico de como cada beneficiário, usuário demandante do pagamento e unidade de negócio (nós) se conectam e interagem uns com os outros. Sem saber o que é uma anomalia, ela aprende o comportamento normal da rede.
Ao notar um padrão atípico, como um usuário que subitamente começa a demandar transferências para muitas novas contas em um curto período, a TGN destaca isso como uma anomalia comportamental. Ela usa a história do nó e o contexto temporal para sinalizar o desvio, sem precisar de exemplos de anomalia pré-existentes.


### **Etapa 1:** Ativação do ambiente virtual (utilizando atualmente Google Colab para prototipação com dados sintéticos)
---
Necessário ajustar pontualmente em caso de utilização em outro ambiente de notebook.

In [None]:
# @title
# ID001
import os

# Define the path for the virtual environment inside Google Drive
# Ensure BASE and REPO are defined correctly from previous cells if needed
# Assuming BASE and REPO are defined as in cell otaQwrjJSgOQ
BASE = "/content/drive/MyDrive/Notebooks"
REPO = "temporal-graph-network"
PROJ = f"{BASE}/{REPO}"
VENV_PATH = f"{PROJ}/.venv_tgn" # Updated venv path to be a hidden folder inside PROJ

# Cria o ambiente virtual temporal-graph_network inside the project folder
# Use --clear if you want to recreate it every time this cell runs
!python -m venv "{VENV_PATH}"

# Ativa o ambiente virtual
# No Colab, a forma de ativar um ambiente virtual é um pouco diferente
# pois não há um shell interativo tradicional.
# A maneira mais comum é adicionar o diretório binário do ambiente virtual
# ao PATH da sessão atual.

# Adiciona o diretório binário do ambiente virtual ao PATH
# Isso permite que você execute executáveis (como pip, python)
# do ambiente virtual recém-criado.
# Use os.pathsep to be platform-independent
os.environ['PATH'] = f"{VENV_PATH}/bin{os.pathsep}{os.environ['PATH']}"

print("Erro de upgrade do pip é normal no Google Colab. \033[1mPode prosseguir.\033[0m")
print(f"Ambiente virtual '{VENV_PATH}' criado e ativado no PATH.")
!which python

# Mensagem isolada com humor (Skynet)
from IPython.display import display, HTML
display(HTML('<div style="margin:12px 0;padding:8px 12px;border:1px dashed #999;">'
             '<b>🤖 Skynet</b>: T-800 ativado. Diagnóstico do ambiente concluído. 🎯 Alvo principal: organização do notebook.'
             '</div>'))

### **Etapa 2:** Instalar as dependências de bibliotecas Python compatíveis com a versão mais moderna disponível.
---
Para uso no JupytherHub (versão atual python 3.7.9) é necessário realizar updgrade do Python do usuário e/ou adaptar as bibliotecas.

---
É possível que as bibliotecas mais atuais de **numpy e scipy** possuam incompatibilidade. Nesse caso, force a desinstalação das bibliotecas na versão atual **Código {!pip uninstall -y numpy pandas scipy scikit-learn}** e comande a instalação das versões compatíveis entre si.

---
Comportamento estável nas versões:
- numpy: 2.0.2
- scipy: 1.16.2
- pandas: 2.3.3
- sklearn: 1.7.2
- networkx: 3.5
- matplotlib: 3.10.6
- pyod: 2.0.5
- tqdm: 4.67.1
- reportlab: 3.6.12 (via pep517)

In [None]:
# @title
# ID002
import sys, subprocess
from importlib import import_module

def pip_command(command, packages, force=False, extra_args=None):
    cmd = [sys.executable, "-m", "pip", command]
    if force:
        cmd.append("--yes") # Use --yes for uninstall to avoid prompts
    if extra_args:
        cmd += list(extra_args)
    cmd += list(packages)
    print("Executando:", " ".join(cmd))
    subprocess.check_call(cmd)

def show_versions(mods):
    print("\n=== Versões carregadas ===")
    for mod in mods:
        try:
            m = import_module(mod)
            v = getattr(m, "__version__", "n/a")
            print(f"{mod}: {v}")
        except ImportError:
            print(f"{mod}: Não instalado")
    print("==========================\n")

CORE_MODS = ("numpy", "scipy", "pandas", "sklearn", "networkx", "matplotlib", "pyod", "tqdm", "reportlab")

# Update pip
pip_command("install", ["pip"], extra_args=["--upgrade"])

# Force uninstall specific libraries
pip_command("uninstall", ["numpy", "pandas", "scipy", "scikit-learn"], force=True)

# Install specified versions
PKGS_TO_INSTALL = [
    "numpy==2.0.2",
    "scipy==1.16.2",
    "pandas==2.3.3",
    "scikit-learn==1.7.2",
    "networkx==3.5",
    "matplotlib==3.10.6",
    "pyod==2.0.5",
    "tqdm==4.67.1",
    "reportlab==3.6.12" # Added reportlab installation
]
pip_command("install", PKGS_TO_INSTALL, extra_args=["--use-pep517"]) # Added --use-pep517 here

# Show installed versions
show_versions(CORE_MODS)

# Mensagem isolada com humor (Skynet)
from IPython.display import display, HTML
display(HTML('<div style="margin:12px 0;padding:8px 12px;border:1px dashed #999;">'
             '<b>🤖 Skynet</b>: Atualizando bibliotecas. Encontrarmos alguns pacotes rebeldes, '
             'mas aplicamos persuasão… com pip. 😎</div>'))

###**Etapa 3:** Configura a pasta onde devem ser inseridos os dados de input e output do modelo, caso elas ainda não existam.

In [None]:
# @title
# ID003 — criação de pastas base alinhadas ao Drive/ID004
import os
from pathlib import Path
from google.colab import drive

# 1) Se o ID004 já rodou, reaproveita ROOT (pasta do projeto no Drive)
if 'ROOT' in globals():
    BASE_DIR = Path(ROOT)
else:
    # 2) Caso contrário, monta o Drive (se preciso) e usa o caminho padrão do projeto
    if not os.path.ismount("/content/drive"):
        print("Montando Google Drive...")
        drive.mount("/content/drive")
    # ajuste aqui se seu projeto estiver em outra pasta
    BASE_DIR = Path("/content/drive/MyDrive/Notebooks/temporal-graph-network").resolve()

INPUT_DIR = BASE_DIR / "input"
OUTPUT_DIR = BASE_DIR / "output"

for d in (INPUT_DIR, OUTPUT_DIR):
    d.mkdir(parents=True, exist_ok=True)

print(f"Diretórios prontos:\n - {INPUT_DIR}\n - {OUTPUT_DIR}")

# Mensagem adicional isolada (Skynet) — mantém exatamente como você escreveu
from IPython.display import display, HTML
display(HTML('<div style="margin:12px 0;padding:8px 12px;border:1px dashed #999;">'
             '<b>🤖 Skynet</b>: Novos modelos neurais para T-800 construídos. Armazéns de CSVs alinhados. '
             'Layout aprovado pela Cyberdyne Systems. 🗂️</div>'))

###**Etapa 4:** Importações das bibliotecas Python e configurações gerais para execução do código
- seed
- associação das pastas criadas às variáveis de execução
- logs

In [None]:
# @title
# ID004
import os, shutil, json, math, warnings, random, gc
from datetime import datetime, timedelta
from zoneinfo import ZoneInfo  # << fuso São Paulo
import numpy as np
import pandas as pd
import networkx as nx
import matplotlib.pyplot as plt
from tqdm import tqdm
from pathlib import Path
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import IsolationForest
from sklearn.neighbors import LocalOutlierFactor
from sklearn.svm import OneClassSVM

# Seeds reprodutibilidade
SEED = 42
np.random.seed(SEED)
random.seed(SEED)

# Estrutura de diretórios (mesma base do projeto)
BASE = "/content/drive/MyDrive/Notebooks"
REPO = "temporal-graph-network"
PROJ = f"{BASE}/{REPO}"

ROOT = Path(PROJ).resolve()
INPUT_DIR = ROOT / "input"
INPUT_CSV = INPUT_DIR / "input.csv"
EXEC_ROOT = ROOT / "output"   # pasta base correta: output

# >>> FUSO HORÁRIO SÃO PAULO + prefixo TGN_ na subpasta da execução
SAO_TZ = ZoneInfo("America/Sao_Paulo")
run_id = datetime.now(SAO_TZ).strftime("TGN_%Y-%m-%d_%H-%M-%S")  # ex.: TGN_2025-10-10_02-03-38

RUN_DIR = EXEC_ROOT / run_id
FIG_DIR = RUN_DIR / "figuras"
RUN_DIR.mkdir(parents=True, exist_ok=True)
FIG_DIR.mkdir(parents=True, exist_ok=True)

# Arquivos de saída
LOG_FILE = RUN_DIR / "log.txt"
RUN_META = RUN_DIR / "run_meta.json"
OUTPUT_CSV = RUN_DIR / "output.csv"

# Logger simples para arquivo
from contextlib import contextmanager

@contextmanager
def tee_log(log_path):
    import sys
    class Tee(object):
        def __init__(self, name, mode):
            self.file = open(name, mode, encoding="utf-8")
            self.stdout = sys.stdout
        def write(self, data):
            self.file.write(data)
            self.stdout.write(data)
        def flush(self, *args, **kwargs):
            self.file.flush()
            self.stdout.flush()
    tee = Tee(str(log_path), "w")
    old_stdout = sys.stdout
    sys.stdout = tee
    try:
        yield
    finally:
        sys.stdout = old_stdout
        tee.file.close()

# Metadados da execução (inclui timezone)
meta = {
    "run_id": run_id,
    "created_at": datetime.now(SAO_TZ).isoformat(),  # com offset -03:00 ou -02:00 (DST)
    "timezone": "America/Sao_Paulo",
    "seed": SEED,
    "input_csv_expected": str(INPUT_CSV),
    "output_csv": str(OUTPUT_CSV),
    "figures_dir": str(FIG_DIR),
    "notes": "Detecção de anomalias em rede temporal"
}
json.dump(meta, open(RUN_META, "w"), indent=2, ensure_ascii=False)

warnings.filterwarnings("ignore")
print(f"RUN_DIR: {RUN_DIR}")

# Mensagem adicional isolada (Skynet) — (não alterada)
from IPython.display import display, HTML
display(HTML('<div style="margin:12px 0;padding:8px 12px;border:1px dashed #999;"><b>🤖 Skynet</b>: T-800, parâmetros centrais em memória.🧠</div>'))

###**Etapa 5:** Importação dos arquivos de input para posterior execução.
---
Implementação atual configurada para Google Colab e permitindo o uso do Google Drive. Para uso em versões futuras é recomendado ajustar para o ambiente de implementação adotado (salvamento em pastas ou apenas upload pelo usuário)

---
Implementação de upload por FileLocal (diretório) apresentando erro no Colab.

Implementar correção **TODO[001]** *prioridade baixa*

####**Sub-etapa específica para uso no Colab:** Montagem do Google Drive (rodar apenas 1x)

In [None]:
# @title
# ID005
# === SETUP GERAL (rode esta célula 1x) ===
import os, shutil, glob
from google.colab import drive
from IPython.display import display, HTML  # usado pela mensagem Skynet

# Ensure BASE and REPO are defined correctly from previous cells if needed
# Assuming BASE and REPO are defined as in cell otaQwrjJSgOQ
try:
    BASE = "/content/drive/MyDrive/Notebooks"
    REPO = "temporal-graph-network"
    PROJ = f"{BASE}/{REPO}"
except NameError:
    # Fallback if BASE/REPO are not defined, though they should be by now
    PROJ = "/content/temporal-graph-network"


# Se não existir INPUT_DIR definido antes no notebook, cria um padrão:
# Using PROJ to define INPUT_DIR
INPUT_DIR = os.path.join(PROJ, "input")


os.makedirs(INPUT_DIR, exist_ok=True)

TARGET_NAME = "input.csv"
TARGET_PATH = os.path.join(INPUT_DIR, TARGET_NAME)

# Monta o Google Drive (somente se ainda não estiver montado)
if not os.path.ismount("/content/drive"):
    print("Montando Google Drive...")
    drive.mount("/content/drive")
else:
    print("Google Drive já montado.")

def _is_csv_filename(name: str) -> bool:
    return name.lower().endswith(".csv")

def _mensagem_skynet_ok():
    # Mensagem adicional isolada (Skynet)
    display(HTML(
        '<div style="margin:12px 0;padding:8px 12px;border:1px dashed #999;"><b>🤖 Skynet</b>: Munição carregada.🧨'
                 '</div>'
    ))

def _save_bytes_as_input_csv(name: str, data: bytes):
    if not _is_csv_filename(name):
        raise ValueError(f"O arquivo '{name}' não possui extensão .csv.")
    with open(TARGET_PATH, "wb") as f:
        f.write(data)
    print(f"Arquivo '{name}' salvo como '{TARGET_NAME}' em: {TARGET_PATH}")
    _mensagem_skynet_ok()

def _copy_drive_file_to_input_csv(src_path: str):
    if not os.path.exists(src_path):
        raise FileNotFoundError(f"O caminho '{src_path}' não existe.")
    if not _is_csv_filename(src_path):
        raise ValueError(f"O arquivo '{src_path}' não possui extensão .csv.")
    shutil.copyfile(src_path, TARGET_PATH)
    print(f"Arquivo do Drive copiado e salvo como '{TARGET_NAME}' em: {TARGET_PATH}")
    _mensagem_skynet_ok()

####**Sub-etapa:** Opção de upload do input.csv pelo Drive

In [None]:
# @title
# ID006
def escolher_csv_no_drive(raiz="/content/drive/MyDrive", max_listar=200):
    print(f"Procurando arquivos .csv em: {raiz} (pode levar alguns segundos)...")
    padrao = os.path.join(raiz, "**", "*.csv")
    arquivos = glob.glob(padrao, recursive=True)

    if not arquivos:
        print("Nenhum .csv encontrado nessa pasta.")
        caminho = input("Cole o caminho COMPLETO do .csv no Drive (ou Enter p/ cancelar): ").strip()
        if caminho:
            _copy_drive_file_to_input_csv(caminho)
        else:
            print("Operação cancelada.")
        return

    arquivos = sorted(arquivos)[:max_listar]
    print(f"Encontrados {len(arquivos)} arquivo(s).")
    for i, p in enumerate(arquivos, 1):
        print(f"[{i:03}] {p}")

    escolha = input("\nDigite o número do arquivo desejado (ou cole o caminho absoluto): ").strip()

    if escolha.isdigit():
        idx = int(escolha)
        if 1 <= idx <= len(arquivos):
            _copy_drive_file_to_input_csv(arquivos[idx-1])
        else:
            print("Índice inválido.")
    elif escolha:
        _copy_drive_file_to_input_csv(escolha)
    else:
        print("Operação cancelada.")

# ===== Execução da seleção no Drive =====
raiz = input("Informe a pasta raiz para busca no Drive (Enter = /content/drive/MyDrive): ").strip()
if not raiz:
    raiz = "/content/drive/MyDrive"

try:
    escolher_csv_no_drive(raiz=raiz)
except Exception as e:
    print(f"Erro na seleção via Drive: {e}")

###**Etapa 6** Leitura e validação dos dados de input.

**Formato do arquivo de input:** CSV UTF-8 com BOM separado por **ponto e vírgula**.

**Informações esperadas:**
- username: código login do usuário;
- lotacao: lotação funcional no formato Área; Área/Depto; ou Área/Depto/Gerência;
- valor: valor financeiro em reais com até duas casas decimais
- beneficiario: CPF ou CNPJ no formato alfanumérico sem pontos ou caracteres especiais.
- timestamp: data e hora da transação no formato dd/mm/aaaa hh:mm

In [None]:
# @title
required_any_timestamp = [["timestamp"], ["data","hora"]]
# Atualiza colunas base esperadas para mapear do input
required_cols_base = [
    "username", "lotacao", "valor", "beneficiario" # Colunas no arquivo de input
]
optional_cols = ["trans_id"]

# Mapeamento das colunas do input para os nomes usados no código
column_mapping = {
    "username": "user_id",
    "lotacao": "unidade_origem",
    "valor": "valor_pago",
    "beneficiario": "beneficiario_id"
}

def has_timestamp_columns(df):
    cols = set(df.columns.str.lower())
    for grp in required_any_timestamp:
        if all(c in cols for c in grp):
            return grp
    return None

with tee_log(LOG_FILE):
    assert INPUT_CSV.exists(), f"Arquivo não encontrado: {INPUT_CSV}"

    # Tenta ler o CSV usando ponto e vírgula como separador
    try:
        df = pd.read_csv(INPUT_CSV, sep=';')
        print("[INFO] CSV lido com sucesso usando ';'.")
    except Exception as e:
        # Se falhar com ';', tenta com ','
        print(f"[AVISO] Falha ao ler CSV com ';': {e}. Tentando com ','.")
        try:
             df = pd.read_csv(INPUT_CSV, sep=',')
             print("[INFO] CSV lido com sucesso usando ','.")
        except Exception as e2:
             raise AssertionError(f"Falha ao ler CSV com ';' ou ',': {e2}") from e2


    df.columns = [c.strip() for c in df.columns]

    # Verifica se as colunas do input existem
    cols_lower = {c.lower(): c for c in df.columns}
    missing_input_cols = [c for c in required_cols_base if c.lower() not in cols_lower]
    assert not missing_input_cols, f"Colunas do arquivo de input ausentes: {missing_input_cols}. Colunas encontradas: {list(df.columns)}" # Adicionado colunas encontradas para debug

    # Renomeia as colunas usando o mapeamento
    df.rename(columns={cols_lower[k.lower()]: v for k, v in column_mapping.items() if k.lower() in cols_lower}, inplace=True)

    # Verifica colunas de timestamp
    ts_group = has_timestamp_columns(df)
    # Agora levanta um erro se não houver timestamp
    assert ts_group is not None, "Coluna de timestamp ('timestamp' ou 'data'/'hora') não encontrada no arquivo de input."

    # Código original para lidar com timestamp ou data/hora
    # Recria o helper col para usar nomes *após* renomear
    def col(c): return {name.lower(): name for name in df.columns}[c.lower()]
    if ts_group == ["timestamp"]:
        df["timestamp"] = pd.to_datetime(df[col("timestamp")], errors="coerce")
    else:
        df["data"] = pd.to_datetime(df[col("data")], errors="coerce").dt.date
        df["hora"] = pd.to_datetime(df[col("hora")], errors="coerce").dt.time
        # Combina data e hora, lidando com possíveis NaT na data ou hora
        df["timestamp"] = pd.to_datetime(df["data"].astype(str) + " " + df["hora"].astype(str), errors="coerce")
        # Remove as colunas temporárias 'data' e 'hora' se existirem e não forem as colunas originais
        if col("data") != "data": del df["data"]
        if col("hora") != "hora": del df["hora"]


    # Tipos básicos (usa os nomes *após* o mapeamento)
    # Adiciona checagens para garantir que as colunas mapeadas existam antes de converter tipos
    if "valor_pago" in df.columns:
        df["valor_pago"] = pd.to_numeric(df["valor_pago"], errors="coerce")
    else:
         raise AssertionError("[ERRO] Coluna 'valor_pago' (mapeada de 'valor') não encontrada após renomear.")


    cols_to_str = ["user_id", "unidade_origem", "beneficiario_id"]
    for cc in cols_to_str:
        # Verifica se a coluna existe antes de tentar converter
        if cc in df.columns:
            df[cc] = df[cc].astype(str).fillna("")
        else:
             raise AssertionError(f"[ERRO] Coluna '{cc}' (mapeada) não encontrada após renomear.")


    # trans_id
    if "trans_id" not in df.columns:
        df["trans_id"] = np.arange(1, len(df)+1, dtype=int)

    # cria chave 1→1 por linha para merges sem duplicar registros
    import numpy as np

    if "row_id" not in df.columns:
        # mantém a ordem atual do df e cria um id estável 0..N-1
        df = df.reset_index(drop=True).copy()
        df["row_id"] = np.arange(len(df), dtype=np.int64)

    print("row_id criado:", int(df["row_id"].min()), "→", int(df["row_id"].max()))
    print("linhas no input:", len(df))

    # Limpeza
    # Garante que as colunas essenciais para a limpeza existam
    essential_subset = ["timestamp", "valor_pago", "user_id", "beneficiario_id"]
    # Filtra subset para incluir apenas colunas que realmente existem no df após mapeamento/criação
    existing_essential_subset = [col for col in essential_subset if col in df.columns]

    # Agora que garantimos que as colunas mapeadas existem, podemos usar o subset completo para o dropna
    before = len(df)
    df = df.dropna(subset=essential_subset)
    df = df.sort_values("timestamp").reset_index(drop=True)
    print(f"Carregadas {before} linhas; após limpeza: {len(df)}")
    # Mensagem adicional isolada (Skynet)
    from IPython.display import display, HTML
    display(HTML('<div style="margin:12px 0;padding:8px 12px;border:1px dashed #999;"><b>🤖 Skynet</b>: T-800 dados incorporados, preparando para buscar na rede.</div>'))

    # Copia o input para a pasta da execução
    # Verifica se INPUT_CSV existe antes de tentar copiar
    if INPUT_CSV.exists():
        shutil.copy2(INPUT_CSV, RUN_DIR / "Input.csv")
    else:
        print(f"[AVISO] Não foi possível copiar o arquivo de input: {INPUT_CSV} não encontrado.")
        # Mensagem adicional isolada (Skynet)
        from IPython.display import display, HTML
        display(HTML('<div style="margin:12px 0;padding:8px 12px;border:1px dashed #999;"><b>🤖 Skynet</b>: T-800 não foi possível incorporar dados. Sarah Connor fugiu.</div>'))

###**Etapa 7** Criação do Grafo Temporal

---

🔎 **O que é um grafo temporal**

Um grafo temporal é uma forma de representar relações entre entidades ao longo do tempo.

Como em um grafo tradicional, temos nós (vértices) que representam agentes (pessoas, empresas, contas bancárias, sistemas).

As arestas (ligações) representam interações entre eles (por exemplo: uma transferência de dinheiro).

A diferença é que no grafo temporal cada aresta possui um carimbo de tempo (timestamp), ou seja, sabemos quando a ligação ocorreu.


Isso permite analisar não só quem se conecta com quem, mas também quando e em qual sequência.

No contexto financeiro, isso é essencial para investigar padrões de comportamento, detectar anomalias e rastrear cadeias de transações suspeitas.

---

💳 **Exemplo prático: rede de pagamentos**

Imagine um sistema de pagamentos onde cada nó é uma conta bancária e cada aresta representa um pagamento realizado.

Se João paga Maria hoje, registramos a aresta (João → Maria, valor=200, data=2025-10-02).

Se Maria transfere para Pedro amanhã, teremos (Maria → Pedro, valor=150, data=2025-10-03).

Assim conseguimos responder perguntas como: i) “Houve uma sequência de pagamentos que movimentou dinheiro rapidamente entre várias contas em poucas horas?” ou ii) “Quem são os intermediários mais frequentes em transferências de grandes valores?”

In [None]:
# @title
# ID008
G = nx.MultiDiGraph()
with tee_log(LOG_FILE):
    for _, row in tqdm(df.iterrows(), total=len(df), desc="Construindo grafo"):
        u = f"U::{row['user_id']}"
        v = f"B::{row['beneficiario_id']}"
        # Atributos de nós úteis (podem ser sobrescritos; você pode agregar)
        if u not in G:
            G.add_node(u, tipo="user")
        if v not in G:
            G.add_node(v, tipo="beneficiario")

        G.add_edge(
            u, v,
            key=row["trans_id"],
            trans_id=int(row["trans_id"]),
            timestamp=row["timestamp"],
            valor=float(row["valor_pago"]),
            unidade_origem=row["unidade_origem"] # Removido area_unidade e notacao_funcional_origem
        )

print(f"Nós: {G.number_of_nodes()} | Arestas: {G.number_of_edges()}")

# Mensagem adicional isolada (Skynet)
from IPython.display import display, HTML
display(HTML('<div style="margin:12px 0;padding:8px 12px;border:1px dashed #999;"><b>🤖 Skynet</b>: Analisando padrão de comportamento de Sarah Connor.</div>'))

###**Etapa 8:** Configuração das janelas temporais de observação

**Janelas de tempo observadas**

Foram adotadas janelas deslizantes para observar padrões de comportamento nas transações:

- "curtissimo": até 1 dia;
- "curto": até 1 semana;
- "medio": até 30 dias;
- "longo": até 120 dias (1 fechamento trimestral + folga); e
- "longuissimo": até 220 dias (1 fechamento semestral + folga).

**Features temporais extraídas**

- Frequência de transações (rolling counts): quantos pagamentos ocorreram entre as mesmas partes dentro da janela.

- Atipicidade do valor:
  - contagem bruta: número de ocorrências na janela temporal (preserva ordens de grandeza);
  - taxa por dia: permite comparatibilidade entre janelas; e
  - robust z-score: compara o valor da transação com a mediana e a dispersão histórica, destacando operações fora do padrão. São gerados z-score global, z-score por usuário e z-score por unidade funcional.

- Densidade da egonet: mede a concentração de conexões ao redor do pagador ou recebedor no snapshot da rede na janela (indica se o nó está em um canal mais estruturado de repasses).

- Burstiness: avalia se os intervalos entre transações seguem padrão explosivo (rajadas), regular ou aleatório.
---

**Registro das features**

Para cada pagamento, as métricas acima foram calculadas no momento do evento, considerando apenas o histórico até aquele instante dentro da janela definida.

**O resultado é um conjunto de atributos anexado à aresta (pagador → recebedor, valor, timestamp), permitindo análises de risco e detecção de anomalias.**

In [None]:
# @title
# ID009
from collections import deque, defaultdict
import numpy as np
import pandas as pd

# janelas em segundos
WINDOWS_SECONDS = {
    "curtissimo":  1   * 24 * 3600,   # <= 1 dia
    "curto":       7   * 24 * 3600,   # <= 1 semana
    "medio":       30  * 24 * 3600,   # <= 30 dias
    "longo":       120 * 24 * 3600,   # <= 120 dias
    "longuissimo": 220 * 24 * 3600,   # <= 220 dias
}

def _robust_z_series(s: pd.Series, eps: float = 1e-9) -> pd.Series:
    med = s.median()
    mad = (np.abs(s - med)).median()
    return 0.6745 * (s - med) / (mad + eps)

def _rolling_counts_global(times: pd.Series, wsec: int) -> pd.Series:
    # contagem deslizante global por timestamp
    q = deque()
    out = np.empty(len(times), dtype=np.int64)
    for i, t in enumerate(times):
        q.append(t)
        while q and (t - q[0]).total_seconds() > wsec:
            q.popleft()
        out[i] = len(q)
    return pd.Series(out, index=times.index)

def _rolling_counts_by_entity(times: pd.Series, entity: pd.Series, wsec: int) -> pd.Series:
    # contagem deslizante por entidade, calculada em única passada
    deques = defaultdict(deque)
    out = np.empty(len(times), dtype=np.int64)
    for i, (t, e) in enumerate(zip(times, entity)):
        q = deques[e]
        q.append(t)
        while q and (t - q[0]).total_seconds() > wsec:
            q.popleft()
        out[i] = len(q)
    return pd.Series(out, index=times.index)

def build_rolling_features_entities(
    df: pd.DataFrame,
    time_col: str = "timestamp",
    user_col: str = "userid",
    unit_col: str = "unidade_origem",
    windows_seconds: dict = WINDOWS_SECONDS,
) -> pd.DataFrame:
    """
    requer:
      df[time_col]: datetime64[ns] ou conversível via pd.to_datetime
      df[user_col]: id de usuário
      df[unit_col]: id de unidade de origem

    retorna dataframe ordenado por tempo com colunas:
      {win}_cnt, {win}_perday, {win}_zglob, {win}_zuser, {win}_zunit
    """
    # ordena por tempo e garante datetime
    work = df.copy()
    work[time_col] = pd.to_datetime(work[time_col], utc=False)
    work = work.sort_values(time_col).reset_index(drop=True)

    # container das features por janela
    feat_blocks = []

    times = work[time_col]

    for win, wsec in windows_seconds.items():
        # global
        g_cnt = _rolling_counts_global(times, wsec)
        g_perday = g_cnt / max(wsec / 86400.0, 1e-9)
        g_z = _robust_z_series(g_cnt)

        # por usuário
        u_cnt = _rolling_counts_by_entity(times, work[user_col], wsec)
        # z por usuário é calculado separadamente em cada grupo
        u_z = u_cnt.groupby(work[user_col], sort=False).transform(_robust_z_series)

        # por unidade
        n_cnt = _rolling_counts_by_entity(times, work[unit_col], wsec)
        # z por unidade é calculado separadamente em cada grupo
        n_z = n_cnt.groupby(work[unit_col], sort=False).transform(_robust_z_series)

        block = pd.DataFrame({
            f"{win}_cnt": g_cnt.values,
            f"{win}_perday": g_perday.values,
            f"{win}_zglob": g_z.values,
            f"{win}_zuser": u_z.values,
            f"{win}_zunit": n_z.values,
        }, index=work.index)

        feat_blocks.append(block)

    feats = pd.concat(feat_blocks, axis=1)
    # anexa as chaves de entidade e o timestamp ordenados, para facilitar merge posterior
    feats.insert(0, time_col, work[time_col].values)
    feats.insert(1, user_col, work[user_col].values)
    feats.insert(2, unit_col, work[unit_col].values)

    # ordena colunas por nome para consistência, mantendo chaves à frente
    prefix_cols = [time_col, user_col, unit_col]
    other_cols = sorted([c for c in feats.columns if c not in prefix_cols])
    feats = feats[prefix_cols + other_cols]
    return feats

# Mensagem adicional isolada (Skynet)
from IPython.display import display, HTML
display(HTML('<div style="margin:12px 0;padding:8px 12px;border:1px dashed #999;"><b>🤖 Skynet</b>: Identificadas as condições críticas para localização do alvo.</div>'))

###**Etapa 9:** Geração das informações nas janelas temporais

#### **Documentação das Features**
  
As features resultam da fusão entre:

- **janelas deslizantes multiescala** (1, 7, 30, 120 e 220 dias);
- **métricas temporais diretas** (intervalos e recência);
- **métricas topológicas** do grafo dinâmico;
- **medidas de raridade e irregularidade temporal** (burstiness).

---


##### Janelas deslizantes multiescala

| Sufixo / Prefixo | Tipo de Feature | Escopo | Descrição | Interpretação |
|------------------|-----------------|---------|------------|----------------|
| `{janela}_cnt` | Contagem bruta global | Todos os eventos | Número de eventos ocorridos dentro da janela (1, 7, 30, 120, 220 dias). | Mede volume absoluto de atividade global. |
| `{janela}_perday` | Taxa global por dia | Todos os eventos | Contagem da janela normalizada pelo tamanho em dias da janela. | Facilita comparação entre janelas de tamanhos diferentes. |
| `{janela}_zglob` | Z-score robusto global | Todos os eventos | Desvio do valor atual em relação à mediana e MAD globais. | Destaca anomalias no padrão geral da organização. |
| `{janela}_ucnt` | Contagem por usuário | Usuário | Número de eventos lançados pelo mesmo usuário dentro da janela. | Volume individual recente. |
| `{janela}_zuser` | Z-score robusto por usuário | Usuário | Desvio do comportamento do usuário em relação ao seu próprio histórico. | Detecta mudanças comportamentais individuais. |
| `{janela}_ncnt` | Contagem por unidade | Unidade (`unidade_origem`) | Número de eventos realizados pela unidade dentro da janela. | Volume operacional por área. |
| `{janela}_zunit` | Z-score robusto por unidade | Unidade (`unidade_origem`) | Desvio do comportamento da unidade em relação ao seu histórico. | Detecta anomalias organizacionais. |

> **Exemplo de nomes de colunas**:  
> `curtissimo_cnt`, `curto_ucnt`, `medio_zunit`, `longuissimo_perday`.

---

##### Métricas temporais diretas

| Coluna | Tipo | Descrição | Interpretação |
|---------|------|------------|----------------|
| `secs_desde_ult_trans_user` | Temporal | Segundos desde o último lançamento do mesmo usuário. | Mede recência individual; valores baixos indicam atividade intensa. |
| `secs_desde_ult_trans_par` | Temporal | Segundos desde o último lançamento do mesmo par usuário-beneficiário. | Mede recorrência transacional; útil para detecção de loops ou repetições. |

---

##### Métricas topológicas (grafo dinâmico)

| Coluna | Tipo | Descrição | Interpretação |
|---------|------|------------|----------------|
| `grau_out_user` | Estrutural | Número de beneficiários distintos para os quais o usuário realizou lançamentos (grau de saída). | Abrangência de conexões do usuário. |
| `grau_in_benef` | Estrutural | Número de usuários distintos que realizaram lançamentos para o mesmo beneficiário (grau de entrada). | Centralidade do beneficiário. |
| `grau_total_user` | Estrutural | Soma de graus de entrada e saída do usuário. | Atividade total (emissor + receptor). |
| `grau_total_benef` | Estrutural | Soma de graus do beneficiário. | Grau de envolvimento do beneficiário. |
| `egonet_density_user` | Estrutural | Densidade do subgrafo formado pelo usuário e seus vizinhos. | Mede o nível de interconexão entre os contatos do usuário; altos valores indicam cliques ou grupos coesos. |

---

##### Métricas relacionais e de irregularidade

| Coluna | Tipo | Descrição | Interpretação |
|---------|------|------------|----------------|
| `par_rareza` | Relacional | \(1 / (1 + \text{número de ocorrências prévias do par})\). | Mede quão incomum é a relação; próximo de 1 = primeira interação. |
| `par_burstiness` | Temporal/Relacional | \((\sigma - \mu) / (\sigma + \mu)\) dos intervalos entre eventos do par. | Mede irregularidade temporal da relação; 1 = explosiva, 0 = aleatória, −1 = regular. |

---

##### Notas operacionais

- Todas as colunas de janelas são calculadas **causalmente** (somente com eventos passados e o atual).  
- Os z-scores são **robustos**, baseados em mediana e MAD, evitando distorção por outliers.  
- Os tempos são expressos em **segundos**; normalizações adicionais (por dia/mês) podem ser feitas em fases posteriores.  
- As janelas temporais atuais são:  
  **1 dia (curtíssimo), 7 dias (curto), 30 dias (médio), 120 dias (longo), 220 dias (longuíssimo)**.  
- Cada evento (linha) representa um **lançamento contábil individual**, associado a:  
  `user_id`, `beneficiario_id`, `unidade_origem` e `valor_pago`.

---

##### Interpretação geral

- **Contagens e taxas**: indicam **nível de atividade**.  
- **Z-scores**: indicam **desvios comportamentais**.  
- **Graus e egonet**: medem **posição e influência na rede**.  
- **Rareza e burstiness**: capturam **irregularidade e novidade das relações**.  
- **Recência (secs)**: quantifica **tempo de inatividade**.

Essas variáveis compõem o vetor de atributos temporais que alimenta os embeddings do
modelo **Temporal Graph Network (DECOI)**, permitindo identificar padrões anômalos
tanto **em nível individual (usuário)** quanto **organizacional (unidade)**.

---

#### **Implementação matemática.**

In [None]:
# @title
# ID010

# janelas em segundos (mantém as 5 janelas)
WINDOWS_SECONDS = {
    "curtissimo":  1   * 24 * 3600,
    "curto":       7   * 24 * 3600,
    "medio":       30  * 24 * 3600,
    "longo":       120 * 24 * 3600,
    "longuissimo": 220 * 24 * 3600,
}

from collections import deque, defaultdict
import numpy as np
import pandas as pd

def _robust_z_series(s: pd.Series, eps: float = 1e-9) -> pd.Series:
    med = s.median()
    mad = (np.abs(s - med)).median()
    return 0.6745 * (s - med) / (mad + eps)

def _rolling_counts_global(times: pd.Series, wsec: int) -> pd.Series:
    q = deque()
    out = np.empty(len(times), dtype=np.int64)
    for i, t in enumerate(times):
        q.append(t)
        while q and (t - q[0]).total_seconds() > wsec:
            q.popleft()
        out[i] = len(q)
    return pd.Series(out, index=times.index)

def _rolling_counts_by_entity(times: pd.Series, entity: pd.Series, wsec: int) -> pd.Series:
    deques = defaultdict(deque)
    out = np.empty(len(times), dtype=np.int64)
    for i, (t, e) in enumerate(zip(times, entity)):
        q = deques[e]
        q.append(t)
        while q and (t - q[0]).total_seconds() > wsec:
            q.popleft()
        out[i] = len(q)
    return pd.Series(out, index=times.index)

# janelas multiescala com row_id preservado
def build_rolling_features_entities(
    df: pd.DataFrame,
    time_col: str = "timestamp",
    user_col: str = "user_id",
    unit_col: str = "unidade_origem",
    row_id_col: str = "row_id",
    windows_seconds: dict = WINDOWS_SECONDS,
) -> pd.DataFrame:
    work = df[[row_id_col, time_col, user_col, unit_col]].copy()
    work[time_col] = pd.to_datetime(work[time_col], utc=False, errors="coerce")
    work = work.sort_values(time_col).reset_index(drop=True)

    times = work[time_col]
    users = work[user_col]
    units = work[unit_col]

    feat_blocks = []
    for win, wsec in windows_seconds.items():
        g_cnt = _rolling_counts_global(times, wsec)
        g_perday = g_cnt / max(wsec / 86400.0, 1e-9)
        g_z = _robust_z_series(g_cnt)

        u_cnt = _rolling_counts_by_entity(times, users, wsec)
        u_z = u_cnt.groupby(users, sort=False).transform(_robust_z_series)

        n_cnt = _rolling_counts_by_entity(times, units, wsec)
        n_z = n_cnt.groupby(units, sort=False).transform(_robust_z_series)

        block = pd.DataFrame({
            f"{win}_cnt":     g_cnt.values,
            f"{win}_perday":  g_perday.values,
            f"{win}_zglob":   g_z.values,
            f"{win}_ucnt":    u_cnt.values,
            f"{win}_zuser":   u_z.values,
            f"{win}_ncnt":    n_cnt.values,
            f"{win}_zunit":   n_z.values,
        }, index=work.index)
        feat_blocks.append(block)

    feats = pd.concat(feat_blocks, axis=1)
    feats.insert(0, row_id_col, work[row_id_col].values)

    prefix = [row_id_col]
    others = sorted([c for c in feats.columns if c not in prefix])
    return feats[prefix + others]

# Mensagem adicional isolada (Skynet)
from IPython.display import display, HTML
display(HTML('<div style="margin:12px 0;padding:8px 12px;border:1px dashed #999;"><b>🤖 Skynet</b>: Informações de comportamento processadas.</div>'))

###**Etapa 10:** Configuração dos modelos matemáticos

**Modelos utilizados**

**Isolation Forest (IF)**

**Mecânica:** cria várias árvores de decisão que “isolam” pontos. Quanto menos cortes são necessários para separar uma observação, mais anômala ela é.

**Motivo da escolha:** bom para detectar transações raras ou de valor atípico em grandes volumes de dados.

**No exemplo:** um pagamento muito acima da média do usuário pode ser isolado rapidamente → sinal de anomalia.

---
**Local Outlier Factor (LOF)**

**Mecânica:** compara a densidade local de vizinhos. Pontos em regiões menos densas são marcados como outliers.

**Motivo da escolha:** captura anomalias contextuais, ou seja, pagamentos que parecem “normais” globalmente, mas destoam do comportamento em seu grupo.

**No exemplo:** se uma conta sempre paga fornecedores fixos e de repente paga um novo beneficiário, o LOF detecta que o padrão local mudou.

---

**One-Class SVM (opcional)**

**Mecânica:** aprende a fronteira do espaço “normal” e marca pontos fora dela como anômalos.

**Motivo da escolha:** útil em cenários onde se deseja maior controle da taxa de outliers (via parâmetro nu).

**No exemplo:** pode ajudar a identificar transferências fora do perfil quando só há poucos históricos para treinar.

---

⚖️ **Regras adicionais**

- Robust Z-score: avalia se o valor do pagamento é distante da mediana histórica (robusto a outliers).

- Rareza: se a relação pagador→beneficiário é pouco frequente, maior chance de anomalia.

- Burstiness: mede explosões de atividade (ex.: vários pagamentos em minutos, após dias sem atividade).



---

🔀 **Uso blended (ensemble por ranking)**

- Cada modelo gera um score de anomalia.

- Em vez de escolher um único, os scores são convertidos em ranks e depois combinados (média).

- Essa abordagem reduz o viés de um modelo só e fortalece sinais consistentes.

- O resultado é um ensemble_score, cortado por percentil (ex.: 97,5%), para definir os eventos anômalos.



---

📌 **Resumo para o exemplo de monitoramento de pagamentos:**

O sistema combina três algoritmos não supervisionados + features baseadas em regras para capturar tanto anomalias globais (Isolation Forest), quanto locais (LOF), quanto estruturais (SVM/regra). O blended via ranking (score conjunto)  garante robustez, evitando que um único modelo domine a decisão.

---
**IMPORTANTE:** Primeira linha deste código configura o percentual para corte e identificação de anomalia.

Isso significa que os modelos tentarão encontrar anomalias em um intervalo de confiança de 97,5% (ATUAL). Quanto menor o percentual de confiança, maior o número de "anomalias" (candidatos) encontrados e, possivelmente, maior o número de FALSO POSITIVOS. Quanto maior o percentual de confiança, mais exigente é o modelo para determinar se algo é realmente fora do comum.

---
Implementar o intervalo de confiança como uma configuração (variável) no começo do Código **TODO[005]** *prioridade média*

In [None]:
# @title
# ID011

# pré-requisitos esperados no ambiente (defina antes, como já fizemos nas etapas anteriores):
# - WINDOWS_SECONDS (1, 7, 30, 120, 220 dias)
# - build_rolling_features_entities(df, time_col, user_col, unit_col)
# - egonet_density, burstiness
# - seu dataframe df com colunas ['timestamp','user_id','unidade_origem','beneficiario_id','valor_pago']

import os
import numpy as np
import pandas as pd
from collections import defaultdict, deque
from IPython.display import display, HTML

from sklearn.ensemble import IsolationForest
from sklearn.neighbors import LocalOutlierFactor
from sklearn.svm import OneClassSVM
from sklearn.preprocessing import StandardScaler

# reduz ruído de threads no colab (opcional)
os.environ.setdefault("OMP_NUM_THREADS", "1")
os.environ.setdefault("OPENBLAS_NUM_THREADS", "1")
os.environ.setdefault("MKL_NUM_THREADS", "1")
os.environ.setdefault("VECLIB_MAXIMUM_THREADS", "1")
os.environ.setdefault("NUMEXPR_NUM_THREADS", "1")

def _require_columns(df, cols, ctx="df"):
    missing = [c for c in cols if c not in df.columns]
    if missing:
        raise KeyError(f"{ctx}: faltam colunas: {missing}")

# raridade/burst por par devolvendo row_id
def compute_pair_rarity_burst(
    df,
    time_col="timestamp",
    user_col="user_id",
    benef_col="beneficiario_id",
    row_id_col="row_id",
    window_seconds=120*24*3600
):
    w = df[[row_id_col, time_col, user_col, benef_col]].copy()
    w[time_col] = pd.to_datetime(w[time_col], utc=False, errors="coerce")
    w = w.sort_values(time_col).reset_index(drop=True)

    dq = defaultdict(deque)
    rare = np.empty(len(w), dtype=float)
    burst = np.empty(len(w), dtype=float)

    for i, (t, u, b) in enumerate(zip(w[time_col], w[user_col], w[benef_col])):
        key = (u, b)
        q = dq[key]
        while q and (t - q[0]).total_seconds() > window_seconds:
            q.popleft()

        rare[i] = 1.0 / (1.0 + len(q))

        seq = list(q) + [t]
        if len(seq) >= 2:
            inter = np.diff([s.timestamp() for s in seq])
            mu = inter.mean(); sigma = inter.std()
            burst[i] = 0.0 if (sigma + mu) == 0 else (sigma - mu) / (sigma + mu)
        else:
            burst[i] = 0.0

        q.append(t)

    return pd.DataFrame({
        row_id_col: w[row_id_col].values,
        "par_rareza": rare,
        "par_burstiness": burst
    })

# montagem final das features com merge por row_id
def build_feat_df(
    df: pd.DataFrame,
    time_col: str = "timestamp",
    user_col: str = "user_id",
    unit_col: str = "unidade_origem",
    benef_col: str = "beneficiario_id",
    include_pair_counts: bool = False,
    include_value_stats: bool = False,
    verbose: bool = True
) -> pd.DataFrame:
    assert "row_id" in df.columns, "Crie df['row_id'] na ID007 antes de prosseguir."

    if verbose: print("build_feat_df: janelas multiescala…")
    df_roll = build_rolling_features_entities(
        df, time_col=time_col, user_col=user_col, unit_col=unit_col, row_id_col="row_id"
    )

    if verbose: print("build_feat_df: merge base + janelas por row_id…")
    feat_df = df.merge(df_roll, on="row_id", how="left", sort=False)
    assert len(feat_df) == len(df), "Merge com df_roll alterou o número de linhas."

    if verbose: print("build_feat_df: raridade/burst por par…")
    df_pairrb = compute_pair_rarity_burst(
        df, time_col=time_col, user_col=user_col, benef_col=benef_col, row_id_col="row_id"
    )
    feat_df = feat_df.merge(df_pairrb, on="row_id", how="left", sort=False)
    assert len(feat_df) == len(df), "Merge com df_pairrb alterou o número de linhas."

    if include_pair_counts:
        raise NotImplementedError("Ative somente se a função de *_pcnt também devolver row_id.")

    if include_value_stats:
        raise NotImplementedError("Ative somente se a função de valor 30d também devolver row_id.")

    feat_df = feat_df.sort_values(time_col).reset_index(drop=True)

    if verbose:
        print("build_feat_df: ok. shape:", feat_df.shape)
        print("sanity:", len(feat_df), "==", len(df), "→", len(feat_df) == len(df))
    return feat_df

def detect_anomalies(
    feat_df: pd.DataFrame,
    use_ocsvm: bool = False,
    anomaly_percentile: float = 97.5,
    windows_for_multi = ("curtissimo", "curto", "medio"),
    seed: int = 42,
    verbose: bool = True
) -> pd.DataFrame:
    if verbose: print("detect_anomalies: iniciando seleção de features...")
    base_feats = [
        "valor_pago",
        "secs_desde_ult_trans_user", "secs_desde_ult_trans_par",
        "grau_out_user", "grau_in_benef", "grau_total_user", "grau_total_benef",
        "egonet_density_user",
        "par_rareza", "par_burstiness"
    ]
    multi_feats = []
    for w in windows_for_multi:
        for col in (f"{w}_ucnt", f"{w}_ncnt", f"{w}_perday", f"{w}_zglob", f"{w}_zuser", f"{w}_zunit"):
            if col in feat_df.columns:
                multi_feats.append(col)
    optional_pair_feats = [c for c in feat_df.columns if c.endswith("_pcnt")]
    optional_value_feats = [c for c in ["user_valor_median_30d","par_valor_median_30d","user_robust_z","par_robust_z"] if c in feat_df.columns]
    model_features = [c for c in (base_feats + multi_feats + optional_pair_feats + optional_value_feats) if c in feat_df.columns]
    if not model_features:
        raise RuntimeError("nenhuma feature disponível para detecção. verifique a montagem do feat_df.")
    if verbose:
        print("detect_anomalies: total de features usadas:", len(model_features))

    X = feat_df[model_features].fillna(0.0).replace([np.inf, -np.inf], 0.0).values
    scaler = StandardScaler()
    Xs = scaler.fit_transform(X)

    if verbose: print("detect_anomalies: isolation forest...")
    iso = IsolationForest(n_estimators=300, max_samples='auto', contamination='auto', random_state=seed, n_jobs=-1)
    iso.fit(Xs)
    iso_score = -iso.score_samples(Xs).astype(float)

    if verbose: print("detect_anomalies: lof...")
    lof = LocalOutlierFactor(n_neighbors=35, contamination='auto', novelty=False, n_jobs=-1)
    _ = lof.fit_predict(Xs)
    lof_cont = -lof.negative_outlier_factor_.astype(float)

    if use_ocsvm:
        if verbose: print("detect_anomalies: one-class svm...")
        ocs = OneClassSVM(gamma='scale', nu=0.01)
        ocs.fit(Xs)
        ocs_score = -ocs.decision_function(Xs).ravel().astype(float)
    else:
        ocs_score = np.zeros(len(Xs), dtype=float)

    if "curto_zuser" in feat_df.columns:
        rz_user_rule = np.clip(feat_df["curto_zuser"].values, 0, None).astype(float)
    elif "curtissimo_zuser" in feat_df.columns:
        rz_user_rule = np.clip(feat_df["curtissimo_zuser"].values, 0, None).astype(float)
    else:
        rz_user_rule = np.zeros(len(feat_df), dtype=float)

    if "curto_zunit" in feat_df.columns:
        rz_unit_rule = np.clip(feat_df["curto_zunit"].values, 0, None).astype(float)
    elif "curtissimo_zunit" in feat_df.columns:
        rz_unit_rule = np.clip(feat_df["curtissimo_zunit"].values, 0, None).astype(float)
    else:
        rz_unit_rule = np.zeros(len(feat_df), dtype=float)

    if "user_robust_z" in feat_df.columns:
        rz_user_val = np.clip(np.nan_to_num(feat_df["user_robust_z"].values, nan=0.0), 0, None).astype(float)
    else:
        rz_user_val = np.zeros(len(feat_df), dtype=float)

    if "par_robust_z" in feat_df.columns:
        rz_par_val = np.clip(np.nan_to_num(feat_df["par_robust_z"].values, nan=0.0), 0, None).astype(float)
    else:
        rz_par_val = np.zeros(len(feat_df), dtype=float)

    rare_score = feat_df["par_rareza"].values.astype(float)
    burst_score = np.clip(feat_df["par_burstiness"].values, 0, None).astype(float)

    scores = pd.DataFrame({
        "iso": iso_score,
        "lof": lof_cont,
        "ocsvm": ocs_score,
        "rz_user_cnt": rz_user_rule,
        "rz_unit_cnt": rz_unit_rule,
        "rz_user_val": rz_user_val,
        "rz_par_val": rz_par_val,
        "rare": rare_score,
        "burst": burst_score
    }, index=feat_df.index)

    ranks = scores.rank(method="average", ascending=True)
    ensemble_rank = ranks.mean(axis=1)
    ensemble_score = (ensemble_rank - ensemble_rank.min()) / (ensemble_rank.max() - ensemble_rank.min() + 1e-9)

    out = feat_df.copy()
    out = pd.concat([out, scores.add_prefix("score_")], axis=1)
    out["ensemble_rank"] = ensemble_rank
    out["ensemble_score"] = ensemble_score

    threshold = np.percentile(ensemble_score, anomaly_percentile)
    out["is_anomaly"] = (out["ensemble_score"] >= threshold).astype(int)

    print(f"corte (percentil {anomaly_percentile}%): {threshold:.4f}")
    print("total anomalias:", int(out["is_anomaly"].sum()), "de", len(out))

    # skynet
    num_anomalies = int(out["is_anomaly"].sum())
    display(HTML(f'<div style="margin:12px 0;padding:8px 12px;border:1px dashed #999;">🤖 Skynet: Identificadas {num_anomalies} anomalias.</div>'))

    return out

def run_full_detection(
    df,
    include_pair_counts=False,
    include_value_stats=False,
    use_ocsvm=False,
    anomaly_percentile=97.5,
    windows_for_multi=("curtissimo","curto","medio"),
    seed=42,
    verbose=True
):
    if verbose: print("run_full_detection: montando feat_df...")
    feat_df = build_feat_df(
        df,
        time_col="timestamp",
        user_col="user_id",
        unit_col="unidade_origem",
        benef_col="beneficiario_id",
        include_pair_counts=include_pair_counts,
        include_value_stats=include_value_stats,
        verbose=verbose
    )
    if verbose: print("run_full_detection: detectando anomalias...")
    feat_df = detect_anomalies(
        feat_df,
        use_ocsvm=use_ocsvm,
        anomaly_percentile=anomaly_percentile,
        windows_for_multi=windows_for_multi,
        seed=seed,
        verbose=verbose
    )
    print("execução concluída.")
    return feat_df

In [None]:
# @title
# ID012 — execução
feat_df = build_feat_df(
    df,
    time_col="timestamp",
    user_col="user_id",
    unit_col="unidade_origem",
    benef_col="beneficiario_id",
    include_pair_counts=False,
    include_value_stats=False,
    verbose=True
)

feat_df = detect_anomalies(
    feat_df,
    use_ocsvm=False,
    anomaly_percentile=97.5,
    windows_for_multi=("curtissimo","curto","medio"),
    seed=42,
    verbose=True
)

print("linhas no input:", len(df), "linhas no feat_df:", len(feat_df))
assert len(feat_df) == len(df), "Cardinalidade alterada — verifique merges por row_id."

###**Etapa 11:** Salva arquivo com a análise realizada.

São criadas subpastas para cada data/hora de execução do código

In [None]:
# @title
# ID013 — salvar resultados e visões operacionais (alinhado ao RUN_DIR e fuso São Paulo)
import os, json, datetime as dt
from zoneinfo import ZoneInfo
import numpy as np
import pandas as pd

# base padrão do projeto (usada apenas se RUN_DIR não existir)
OUTPUT_DIR_DEFAULT = "/content/drive/MyDrive/Notebooks/temporal-graph-network/output"
SAO_TZ = None
try:
    SAO_TZ = ZoneInfo("America/Sao_Paulo")
except Exception:
    SAO_TZ = None  # fallback sem timezone explícito

def _cols_if_exist(df, cols):
    return [c for c in cols if c in df.columns]

def _topN_por_grupo(df, group_col, sort_col="ensemble_score", n_top=50):
    if group_col not in df.columns:
        return pd.DataFrame(columns=[group_col, sort_col])
    return (
        df.sort_values(sort_col, ascending=False)
          .groupby(group_col, group_keys=False)
          .head(n_top)
    )

def _serie_diaria(df, timestamp_col="timestamp", score_col="ensemble_score", flag_col="is_anomaly"):
    if timestamp_col not in df.columns:
        return pd.DataFrame(columns=["data","anomalias","avg_ensemble_score"])
    w = df[[timestamp_col, score_col] + ([flag_col] if flag_col in df.columns else [])].copy()
    w["data"] = pd.to_datetime(w[timestamp_col], errors="coerce").dt.date
    grp = w.groupby("data", dropna=True)
    out = pd.DataFrame({
        "anomalias": grp[flag_col].sum() if flag_col in w.columns else grp[score_col].size(),
        "avg_ensemble_score": grp[score_col].mean()
    }).reset_index()
    return out

def _anomalias_por_unidade_dia(df, timestamp_col="timestamp", unidade_col="unidade_origem",
                               score_col="ensemble_score", flag_col="is_anomaly"):
    if timestamp_col not in df.columns or unidade_col not in df.columns:
        return pd.DataFrame(columns=[unidade_col, "data", "anomalias", "avg_ensemble_score"])
    w = df[[timestamp_col, unidade_col, score_col] + ([flag_col] if flag_col in df.columns else [])].copy()
    w["data"] = pd.to_datetime(w[timestamp_col], errors="coerce").dt.date
    grp = w.groupby([unidade_col, "data"], dropna=True)
    out = pd.DataFrame({
        "anomalias": grp[flag_col].sum() if flag_col in w.columns else grp[score_col].size(),
        "avg_ensemble_score": grp[score_col].mean()
    }).reset_index()
    return out

def _top_features_explicabilidade(df, score_prefix="score_", flag_col="is_anomaly"):
    score_cols = [c for c in df.columns if c.startswith(score_prefix)]
    if not score_cols:
        return pd.DataFrame(columns=["feature","media_anomalia","media_geral","impacto_relativo"])
    df_anom = df[df[flag_col] == 1] if flag_col in df.columns else df
    media_anom = df_anom[score_cols].mean().rename("media_anomalia")
    media_geral = df[score_cols].mean().rename("media_geral")
    explic = pd.concat([media_anom, media_geral], axis=1)
    explic["impacto_relativo"] = explic["media_anomalia"] / (explic["media_geral"] + 1e-9)
    explic = explic.reset_index().rename(columns={"index": "feature"})
    explic = explic.sort_values("impacto_relativo", ascending=False)
    return explic

def _data_dictionary(df):
    return pd.DataFrame({
        "column": df.columns,
        "dtype": [str(dt) for dt in df.dtypes]
    })

def save_results_id013(
    df_input: pd.DataFrame,
    feat_df: pd.DataFrame,
    output_dir: str = OUTPUT_DIR_DEFAULT,
    anomaly_percentile: float = 97.5,
    top_k_anomalies: int = 1000,
    top_n_por_grupo: int = 50
):
    # sanity checks
    if feat_df is None or len(feat_df) == 0:
        raise RuntimeError("feat_df está vazio ou não foi gerado.")
    if df_input is None or len(df_input) == 0:
        raise RuntimeError("df de entrada está vazio ou não foi carregado.")

    # determina a pasta de execução: preferir RUN_DIR; caso não exista, criar TGN_... em output_dir
    if 'RUN_DIR' in globals() and RUN_DIR is not None and os.path.isdir(str(RUN_DIR)):
        outdir = str(RUN_DIR)
        print("Usando RUN_DIR existente:", outdir)
    else:
        ts = (dt.datetime.now(SAO_TZ).strftime("%Y-%m-%d_%H-%M-%S")
              if SAO_TZ else dt.datetime.now().strftime("%Y-%m-%d_%H-%M-%S"))
        outdir = os.path.join(output_dir, f"TGN_{ts}")
        os.makedirs(outdir, exist_ok=True)
        print("RUN_DIR não definido; criada pasta de execução:", outdir)

    # ordena por score
    if "ensemble_score" not in feat_df.columns:
        raise KeyError("coluna 'ensemble_score' não encontrada em feat_df.")
    df_sorted = feat_df.sort_values("ensemble_score", ascending=False)

    # threshold de documentação
    threshold = float(np.percentile(df_sorted["ensemble_score"].values, anomaly_percentile))

    # colunas recomendadas
    key_cols   = _cols_if_exist(feat_df, ["row_id","trans_id","timestamp","user_id","beneficiario_id","unidade_origem","valor_pago"])
    flag_cols  = _cols_if_exist(feat_df, ["ensemble_score","ensemble_rank","is_anomaly"])
    score_cols = [c for c in feat_df.columns if c.startswith("score_")]
    extra_cols = _cols_if_exist(feat_df, [
        "secs_desde_ult_trans_user","secs_desde_ult_trans_par",
        "par_rareza","par_burstiness",
        "grau_out_user","grau_in_benef","grau_total_user","grau_total_benef","egonet_density_user"
    ])
    export_cols = key_cols + flag_cols + score_cols + extra_cols
    export_cols += [c for c in feat_df.columns if c not in export_cols]  # inclui o resto ao final

    # caminhos de saída (nomes padronizados)
    path_csv_full   = os.path.join(outdir, "full.csv")
    path_parq_full  = os.path.join(outdir, "full.parquet")
    path_csv_anom   = os.path.join(outdir, "anomalies.csv")
    path_manifest   = os.path.join(outdir, "manifest.json")

    # salva full CSV
    df_sorted.to_csv(path_csv_full, index=False, columns=export_cols)

    # salva full Parquet
    used_parquet = True
    try:
        df_sorted.to_parquet(path_parq_full, index=False)
    except Exception as e:
        print("aviso: to_parquet falhou; manteremos apenas CSV. erro:", repr(e))
        used_parquet = False

    # salva anomalias (ou top-k quando flag ausente)
    if "is_anomaly" in df_sorted.columns:
        df_anom = df_sorted[df_sorted["is_anomaly"] == 1].copy()
    else:
        df_anom = df_sorted.head(top_k_anomalies).copy()
    df_anom.to_csv(path_csv_anom, index=False, columns=export_cols)

    # visões adicionais
    views = {}

    # top-N por usuário
    top_user = _topN_por_grupo(df_anom if len(df_anom) else df_sorted, "user_id", "ensemble_score", n_top=top_n_por_grupo)
    path_top_user = os.path.join(outdir, f"top{top_n_por_grupo}_por_usuario.csv")
    top_user.to_csv(path_top_user, index=False)
    views["top_user"] = path_top_user

    # top-N por unidade
    top_unit = _topN_por_grupo(df_anom if len(df_anom) else df_sorted, "unidade_origem", "ensemble_score", n_top=top_n_por_grupo)
    path_top_unit = os.path.join(outdir, f"top{top_n_por_grupo}_por_unidade.csv")
    top_unit.to_csv(path_top_unit, index=False)
    views["top_unidade"] = path_top_unit

    # top-N por beneficiário
    top_benef = _topN_por_grupo(df_anom if len(df_anom) else df_sorted, "beneficiario_id", "ensemble_score", n_top=top_n_por_grupo)
    path_top_benef = os.path.join(outdir, f"top{top_n_por_grupo}_por_beneficiario.csv")
    top_benef.to_csv(path_top_benef, index=False)
    views["top_beneficiario"] = path_top_benef

    # série temporal diária
    serie = _serie_diaria(df_sorted, timestamp_col="timestamp", score_col="ensemble_score", flag_col="is_anomaly")
    path_serie = os.path.join(outdir, "anomalias_por_dia.csv")
    serie.to_csv(path_serie, index=False)
    views["anomalias_por_dia"] = path_serie

    # anomalias por unidade × dia
    serie_unid = _anomalias_por_unidade_dia(df_sorted, timestamp_col="timestamp", unidade_col="unidade_origem",
                                            score_col="ensemble_score", flag_col="is_anomaly")
    path_serie_unid = os.path.join(outdir, "anomalias_por_unidade_dia.csv")
    serie_unid.to_csv(path_serie_unid, index=False)
    views["anomalias_por_unidade_dia"] = path_serie_unid

    # top features (explicabilidade simples)
    top_feat = _top_features_explicabilidade(df_sorted, score_prefix="score_", flag_col="is_anomaly")
    path_top_feat = os.path.join(outdir, "top_features.csv")
    top_feat.to_csv(path_top_feat, index=False)
    views["top_features"] = path_top_feat

    # data dictionary
    dict_df = _data_dictionary(df_sorted)
    path_dict = os.path.join(outdir, "data_dictionary.csv")
    dict_df.to_csv(path_dict, index=False)
    views["data_dictionary"] = path_dict

    # manifest
    paths = {
        "csv_full": path_csv_full,
        "csv_anomalies": path_csv_anom,
        **views
    }
    if used_parquet:
        paths["parquet_full"] = path_parq_full

    manifest = {
        "timestamp_execucao": (dt.datetime.now(SAO_TZ).isoformat() if SAO_TZ else dt.datetime.now().isoformat()),
        "output_dir": outdir,
        "input_rows": int(len(df_input)),
        "output_rows": int(len(feat_df)),
        "anomaly_percentile": float(anomaly_percentile),
        "threshold_ensemble_score": threshold,
        "paths": paths,
        "columns": {
            "keys": key_cols,
            "flags": flag_cols,
            "scores": score_cols
        }
    }
    with open(path_manifest, "w", encoding="utf-8") as f:
        json.dump(manifest, f, ensure_ascii=False, indent=2)

    print("resultados salvos em:", outdir)
    print("arquivos gerados:")
    for k, v in paths.items():
        print(" -", k, ":", v)
    print(" - manifest:", path_manifest)

    return {
        "outdir": outdir,
        "paths": paths,
        "manifest": path_manifest
    }

# exemplo de uso logo após a etapa de detecção:
# save_info = save_results_id013(df, feat_df, output_dir=OUTPUT_DIR_DEFAULT, anomaly_percentile=97.5, top_k_anomalies=1000, top_n_por_grupo=50)

In [None]:
# @title
# ID013-CONT1
from pathlib import Path
import os, json

base = Path("/content/drive/MyDrive/Notebooks/temporal-graph-network/output")
curr = Path(str(RUN_DIR)) if 'RUN_DIR' in globals() else None
print("RUN_DIR atual:", curr)

def list_dir(p):
    if not p or not p.exists():
        print("(!) pasta inexistente:", p);
        return
    print("Conteúdo de", p, ":")
    for f in sorted(p.iterdir()):
        print(" -", f.name)

list_dir(curr)

need = {"full.parquet","full.csv","anomalies.csv","manifest.json"}
have = {x.name for x in curr.iterdir()} if curr and curr.exists() else set()
missing = sorted(need - have)
print("Faltam no RUN_DIR atual:", missing)

# Procura a última pasta que realmente tenha full/anomalies
cands = []
for d in sorted(base.iterdir(), key=os.path.getmtime, reverse=True):
    if not d.is_dir():
        continue
    names = {x.name for x in d.iterdir()}
    if ("full.parquet" in names or "full.csv" in names) and ("anomalies.csv" in names):
        cands.append(d)
        break

if cands:
    print("Pasta com dados prontos encontrada:", cands[0])
    list_dir(cands[0])
else:
    print("Nenhuma pasta anterior com full/anomalies encontrada.")

In [None]:
# @title
# ID013-CONT2
# garanta estes nomes:
OUTPUT_DIR_DEFAULT = "/content/drive/MyDrive/Notebooks/temporal-graph-network/output"

save_info = save_results_id013(
    df_input=df,
    feat_df=feat_df,
    output_dir=OUTPUT_DIR_DEFAULT,
    anomaly_percentile=97.5,
    top_k_anomalies=1000,
    top_n_por_grupo=50
)
print("Gravou em:", save_info["outdir"])

###**Etapa 12:** Análise Gráfica

Distribuição e Top-N (com corte por percentil e K sugerido por maior gap)

Essa análise gera dois gráficos que ajudam a entender como os escores de anomalia (“ensemble_score”) estão distribuídos e quais transações são mais suspeitas:

---
**Histograma – Distribuição do ensemble_score**

Mostra a frequência dos escores em toda a base.

O que procurar:
- A linha pontilhada indica o percentil de corte (ex.: Percentil 97,5). Deve ser verificado se ela cai na região da cauda, o que significa que só os casos mais extremos serão analisados (evitando falsos positivos).
- Se a maior parte dos casos está em valores baixos/médios e existe uma cauda à direita (valores muito altos), esses pontos de cauda são os candidatos a anomalias.
- Observar o valor correspondente ao percentil estabelecido para complementar a próxima análise.
---
**Gráfico de linha – Top N transações mais anômalas**

Ordena os maiores escores (rank 1 = mais anômala).

O que procurar:
- Grandes saltos (“gaps”) entre ranks consecutivos: indicam que as transações até o salto são bem mais anômalas do que as demais — são as que merecem atenção imediata.
- Observar os valores de escore de anomalia encontrados nos N registros mais anômalos versus o valor correspondente ao percentil estabelecido. Quanto maior a diferença, mais anômalo.
- Platô (curva que se estabiliza): mostra a partir de que ponto (K) os casos deixam de ser tão excepcionais. Observar o K sugerido pelo maior gap, ele indica quantos casos devem ser priorizados na revisão manual. Até K registros são candidatos muito fortes para anomalias e exigem revisão manual.
---

Em resumo: os gráficos servem para decidir onde cortar e quais transações revisar primeiro.

In [None]:
# @title
# ID014 — análise estatística e visual das anomalias (salva PNGs em FIG_DIR)
import os, json
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from matplotlib.ticker import MaxNLocator
from google.colab import drive

# ===== CONFIGURAÇÕES =====
OUTPUT_DIR_BASE = "/content/drive/MyDrive/Notebooks/temporal-graph-network/output"
SAVE_PLOTS = True  # defina False se não quiser salvar PNGs

# estilo básico
plt.rcParams["figure.figsize"] = (10, 5)
plt.rcParams["axes.grid"] = True
plt.rcParams["font.size"] = 11

# ===== DRIVE E BASE =====
if not os.path.ismount("/content/drive"):
    print("Montando Google Drive...")
    drive.mount("/content/drive")
else:
    print("Google Drive já montado.")
os.makedirs(OUTPUT_DIR_BASE, exist_ok=True)
print("Base:", OUTPUT_DIR_BASE)

# ===== HELPERS =====
def _ensure_fig_dir(latest_dir):
    # usa FIG_DIR do RUN_DIR se disponível; senão cria fallback
    fig_dir = os.path.join(latest_dir, "figuras")
    os.makedirs(fig_dir, exist_ok=True)
    return fig_dir

_fig_counter = {"n": 0}
def _save_show(fig, fig_dir, name_hint):
    if SAVE_PLOTS:
        _fig_counter["n"] += 1
        fname = f"{_fig_counter['n']:02d}_{name_hint}.png".replace(" ", "_")
        path = os.path.join(fig_dir, fname)
        fig.savefig(path, dpi=150, bbox_inches="tight")
        print("Figura salva:", path)
    plt.show()

def _plot_hist(data, title, xlabel, ylabel="Frequência", bins=50, threshold=None, fig_dir=None):
    fig = plt.figure()
    plt.hist(data, bins=bins, alpha=0.7)
    if threshold is not None:
        plt.axvline(threshold, color="red", linestyle="--", label=f"threshold = {threshold:.4f}")
        plt.legend()
    plt.title(title)
    plt.xlabel(xlabel)
    plt.ylabel(ylabel)
    plt.tight_layout()
    _save_show(fig, fig_dir, title)

def _pick_run_dir():
    # 1) se RUN_DIR existe e tem dados, usa
    if "RUN_DIR" in globals():
        rd = str(RUN_DIR)
        if os.path.isdir(rd):
            names = set(os.listdir(rd))
            if (("full.parquet" in names) or ("full.csv" in names)) and ("anomalies.csv" in names):
                print("Usando RUN_DIR ativo:", rd)
                return rd
            else:
                print("RUN_DIR ativo não tem full/anomalies — procurando pasta anterior com dados...")

    # 2) varre /output e pega a mais recente com full/anomalies
    subdirs = [
        os.path.join(OUTPUT_DIR_BASE, d)
        for d in os.listdir(OUTPUT_DIR_BASE)
        if os.path.isdir(os.path.join(OUTPUT_DIR_BASE, d))
    ]
    if not subdirs:
        raise RuntimeError("Nenhuma subpasta encontrada em /output.")
    subdirs.sort(key=os.path.getmtime, reverse=True)

    for d in subdirs:
        names = set(os.listdir(d))
        if (("full.parquet" in names) or ("full.csv" in names)) and ("anomalies.csv" in names):
            print("Usando pasta com dados:", d)
            return d

    # 3) se nada tiver dados, devolve a mais recente e deixamos falhar com diagnóstico
    chosen = subdirs[0]
    print("Atenção: nenhuma pasta com dados completos; usando a mais recente:", chosen)
    return chosen

def _load_paths(latest_dir):
    manifest_files = [f for f in os.listdir(latest_dir) if f.endswith("manifest.json")]
    full_path = None
    anom_path = None
    threshold = None

    if manifest_files:
        mpath = os.path.join(latest_dir, manifest_files[0])
        try:
            with open(mpath, "r", encoding="utf-8") as f:
                m = json.load(f)
            print("Manifest carregado:", mpath)
            threshold = m.get("threshold_ensemble_score") or m.get("threshold", None)
            paths = m.get("paths", {})
            cand_full = [paths.get(k) for k in ["parquet_full", "csv_full"] if paths.get(k)]
            cand_anom = [paths.get(k) for k in ["csv_anomalies"] if paths.get(k)]
            if cand_full:
                full_path = cand_full[0]
                if not os.path.isabs(full_path):
                    full_path = os.path.join(latest_dir, os.path.basename(full_path))
            if cand_anom:
                anom_path = cand_anom[0]
                if not os.path.isabs(anom_path):
                    anom_path = os.path.join(latest_dir, os.path.basename(anom_path))
        except Exception as e:
            print("Aviso: falha ao ler manifest:", repr(e))

    if not full_path:
        cands = [f for f in os.listdir(latest_dir) if f.endswith("_full.parquet")]
        if not cands and "full.parquet" in os.listdir(latest_dir):
            cands = ["full.parquet"]
        if not cands:
            cands = [f for f in os.listdir(latest_dir) if f.lower().endswith(".parquet")]
        if cands:
            full_path = os.path.join(latest_dir, cands[0])

    if not anom_path:
        cands = [f for f in os.listdir(latest_dir) if f.endswith("_anomalies.csv")]
        if not cands and "anomalies.csv" in os.listdir(latest_dir):
            cands = ["anomalies.csv"]
        if not cands:
            cands = [f for f in os.listdir(latest_dir)
                     if f.lower().endswith(".csv") and "anom" in f.lower()]
        if cands:
            anom_path = os.path.join(latest_dir, cands[0])

    if not full_path or not anom_path:
        print("\nConteúdo da pasta para diagnóstico:")
        for f in sorted(os.listdir(latest_dir)):
            print(" -", f)

    if not full_path:
        raise FileNotFoundError("Não encontrei dataset 'full' (parquet/csv) em " + latest_dir)
    if not anom_path:
        raise FileNotFoundError("Não encontrei arquivo de 'anomalias' (csv) em " + latest_dir)

    return full_path, anom_path, threshold

# ===== EXECUÇÃO =====
latest_dir = _pick_run_dir()
fig_dir = _ensure_fig_dir(latest_dir)
full_path, anom_path, threshold = _load_paths(latest_dir)

if full_path.endswith(".parquet"):
    df_full = pd.read_parquet(full_path)
else:
    df_full = pd.read_csv(full_path)
df_anom = pd.read_csv(anom_path)

print(f"linhas totais: {len(df_full):,} | anomalias: {len(df_anom):,}")
print("Arquivos usados:\n - full:", full_path, "\n - anomalies:", anom_path, "\n - threshold:", threshold)
print("Figuras serão salvas em:", fig_dir) if SAVE_PLOTS else None

# ===== ESTATÍSTICA DESCRITIVA DO INPUT =====
print("\n[descrição estatística de valor_pago]")
if "valor_pago" in df_full.columns:
    desc = df_full["valor_pago"].describe(percentiles=[.01,.05,.1,.25,.5,.75,.9,.95,.99])
    print(desc)
    _plot_hist(df_full["valor_pago"], "Distribuição de Valor Pago", "Valor Pago (R$)", bins=60, fig_dir=fig_dir)
else:
    print("coluna 'valor_pago' ausente no dataset.")

if "timestamp" in df_full.columns:
    df_full["data"] = pd.to_datetime(df_full["timestamp"], errors="coerce").dt.date
    freq_por_dia = df_full["data"].value_counts().sort_index()
    fig = plt.figure()
    plt.plot(freq_por_dia.index, freq_por_dia.values)
    plt.title("Frequência de Lançamentos por Dia")
    plt.xlabel("Data")
    plt.ylabel("Quantidade de lançamentos")
    plt.xticks(rotation=45)
    plt.tight_layout()
    _save_show(fig, fig_dir, "Frequencia_por_dia")

# ===== DISTRIBUIÇÃO DO ENSEMBLE SCORE =====
if "ensemble_score" in df_full.columns:
    _plot_hist(df_full["ensemble_score"], "Distribuição do Ensemble Score", "ensemble_score", bins=50, threshold=threshold, fig_dir=fig_dir)

    if "is_anomaly" in df_full.columns:
        fig = plt.figure()
        plt.boxplot(
            [df_full.loc[df_full["is_anomaly"] == 0, "ensemble_score"],
             df_full.loc[df_full["is_anomaly"] == 1, "ensemble_score"]],
            labels=["Normal", "Anômalo"],
            patch_artist=True,
            boxprops=dict(facecolor="lightgray", color="black"),
            medianprops=dict(color="red")
        )
        plt.title("Boxplot do Ensemble Score (Normal vs Anômalo)")
        plt.ylabel("ensemble_score")
        plt.tight_layout()
        _save_show(fig, fig_dir, "Boxplot_ensemble_score")

# ===== ANOMALIAS POR UNIDADE =====
if "unidade_origem" in df_full.columns and "is_anomaly" in df_full.columns:
    fig = plt.figure()
    df_unit = df_full.groupby("unidade_origem")["is_anomaly"].sum().sort_values(ascending=False).head(20)
    df_unit.plot(kind="bar", color="firebrick")
    plt.title("Top 20 Unidades com Mais Anomalias")
    plt.ylabel("Qtde de Anomalias")
    plt.xlabel("Unidade Origem")
    plt.xticks(rotation=70)
    plt.tight_layout()
    _save_show(fig, fig_dir, "Top20_unidades_anomalias")

# ===== ANOMALIAS POR USUÁRIO =====
if "user_id" in df_full.columns and "is_anomaly" in df_full.columns:
    fig = plt.figure()
    df_user = df_full.groupby("user_id")["is_anomaly"].sum().sort_values(ascending=False).head(20)
    df_user.plot(kind="bar", color="darkslateblue")
    plt.title("Top 20 Usuários com Mais Anomalias")
    plt.ylabel("Qtde de Anomalias")
    plt.xlabel("User ID")
    plt.xticks(rotation=70)
    plt.tight_layout()
    _save_show(fig, fig_dir, "Top20_usuarios_anomalias")

# ===== SÉRIE TEMPORAL DE ANOMALIAS POR DIA =====
cand = [f for f in os.listdir(latest_dir) if f.endswith("anomalias_por_dia.csv")]
if cand:
    serie = pd.read_csv(os.path.join(latest_dir, cand[0]))
    fig = plt.figure()
    plt.plot(serie["data"], serie["anomalias"], label="Anomalias")
    plt.title("Anomalias por Dia")
    plt.xlabel("Data")
    plt.ylabel("Qtde de Anomalias")
    plt.xticks(rotation=45)
    plt.tight_layout()
    _save_show(fig, fig_dir, "Anomalias_por_dia")

# ===== ANOMALIAS POR UNIDADE × DIA =====
cand = [f for f in os.listdir(latest_dir) if f.endswith("anomalias_por_unidade_dia.csv")]
if cand:
    df_u = pd.read_csv(os.path.join(latest_dir, cand[0]))
    pivot = df_u.pivot_table(index="data", columns="unidade_origem", values="anomalias", fill_value=0)
    pivot = pivot.iloc[-30:] if len(pivot) > 30 else pivot
    fig = plt.figure(figsize=(12,6))
    plt.stackplot(pivot.index, pivot.T, labels=pivot.columns)
    plt.title("Evolução de Anomalias por Unidade (últimos 30 dias)")
    plt.xlabel("Data")
    plt.ylabel("Qtde de Anomalias")
    plt.legend(loc="upper left", bbox_to_anchor=(1,1))
    plt.tight_layout()
    _save_show(fig, fig_dir, "Evolucao_anomalias_por_unidade_30d")

# ===== CORRELAÇÃO ENTRE SCORES =====
score_cols = [c for c in df_full.columns if c.startswith("score_")]
if len(score_cols) >= 2:
    corr = df_full[score_cols].corr()
    fig = plt.figure(figsize=(8,6))
    plt.imshow(corr, cmap="coolwarm", interpolation="nearest")
    plt.xticks(range(len(score_cols)), score_cols, rotation=45, ha="right")
    plt.yticks(range(len(score_cols)), score_cols)
    plt.title("Correlação entre Scores")
    plt.colorbar()
    plt.tight_layout()
    _save_show(fig, fig_dir, "Correlacao_scores")

# ===== BOXPLOT DE VALOR_PAGO VS ANOMALIA =====
if "valor_pago" in df_full.columns and "is_anomaly" in df_full.columns:
    fig = plt.figure()
    plt.boxplot(
        [df_full.loc[df_full["is_anomaly"] == 0, "valor_pago"],
         df_full.loc[df_full["is_anomaly"] == 1, "valor_pago"]],
        labels=["Normal", "Anômalo"],
        patch_artist=True,
        boxprops=dict(facecolor="lightgray", color="black"),
        medianprops=dict(color="red")
    )
    plt.title("Boxplot de Valor Pago (Normal vs Anômalo)")
    plt.ylabel("Valor Pago (R$)")
    plt.yscale("log")
    plt.tight_layout()
    _save_show(fig, fig_dir, "Boxplot_valor_pago_log")

print("\nAnálises gráficas concluídas para:", latest_dir)
print("Imagens salvas em:", fig_dir) if SAVE_PLOTS else None

###**Etapa 13:** Ego-Subgrafo

Abaixo é gerado um ego-subgrafo em torno do usuário envolvido na transação mais anômala, mostrando suas conexões diretas no grafo de pagamentos.

**O que ele pretende demonstrar:**

Quem está conectado ao usuário central, a intensidade e frequência das transações (espessura das arestas), a relevância de cada nó (tamanho proporcional ao grau de conexões) e papéis distintos dos nós, facilitando a leitura do contexto da anomalia.


- Estrela de saída (um nó azul/central pagando muitos verdes) → possível dispersão suspeita.
- Muitos nós conectados a ele → possível conta “coletora”.
- Ciclos ou arestas bidirecionais → podem indicar movimentação circular de valores.


👉 Em resumo: o gráfico mostra a vizinhança imediata do usuário mais anômalo, destacando quem paga, quem recebe e a força dessas relações, para facilitar a investigação do porquê desse score elevado.


In [None]:
# @title
# ID015 — Visualização de egos: Top-N usuários e Top-N unidades de lotação
import os
import numpy as np
import matplotlib.pyplot as plt
import networkx as nx
from matplotlib.lines import Line2D
from pathlib import Path

# parâmetros
TOP_N_USERS  = 3   # quantos usuários mais anômalos plotar
TOP_N_UNITS  = 3   # quantas unidades mais anômalas plotar
SEED = SEED if 'SEED' in globals() else 42

# ==== helpers comuns ====
def _ensure_fig_dir():
    if 'FIG_DIR' in globals():
        fig_dir = str(FIG_DIR)
    elif 'RUN_DIR' in globals():
        fig_dir = os.path.join(str(RUN_DIR), "figuras")
    else:
        fig_dir = "./figuras"
    os.makedirs(fig_dir, exist_ok=True)
    return fig_dir

def _build_gsnap_from(df_like, user_col="user_id", benef_col="beneficiario_id",
                      value_col="valor_pago") -> nx.DiGraph:
    """
    Reconstrói um DiGraph agregando pesos por par (user->benef), com nós nomeados U:: e B::.
    """
    if df_like is None or len(df_like) == 0:
        raise RuntimeError("Dataset vazio para reconstruir Gsnap.")
    needed = {user_col, benef_col}
    if not needed.issubset(df_like.columns):
        raise RuntimeError(f"Colunas necessárias ausentes: {needed - set(df_like.columns)}")

    use_value = (value_col in df_like.columns)
    G = nx.DiGraph()

    if use_value:
        grp = df_like.groupby([user_col, benef_col])[value_col].sum().reset_index()
        for _, r in grp.iterrows():
            uN = f"U::{r[user_col]}"
            vN = f"B::{r[benef_col]}"
            w = float(r[value_col]) if np.isfinite(r[value_col]) else 1.0
            if not G.has_node(uN): G.add_node(uN)
            if not G.has_node(vN): G.add_node(vN)
            G.add_edge(uN, vN, weight=w)
    else:
        grp = df_like.groupby([user_col, benef_col]).size().reset_index(name="w")
        for _, r in grp.iterrows():
            uN = f"U::{r[user_col]}"
            vN = f"B::{r[benef_col]}"
            w = float(r["w"])
            if not G.has_node(uN): G.add_node(uN)
            if not G.has_node(vN): G.add_node(vN)
            G.add_edge(uN, vN, weight=w)

    return G

def _get_gsnap():
    """
    Retorna um Gsnap utilizável: usa o global se existir; senão reconstrói a partir de
    df_full (ID014) -> feat_df -> df.
    """
    if 'Gsnap' in globals() and isinstance(Gsnap, nx.DiGraph):
        return Gsnap, "existente"

    if 'df_full' in globals():
        try:
            G = _build_gsnap_from(df_full)
            return G, "df_full"
        except Exception as e:
            print("Aviso: não foi possível reconstruir a partir de df_full:", e)

    if 'feat_df' in globals():
        try:
            G = _build_gsnap_from(feat_df)
            return G, "feat_df"
        except Exception as e:
            print("Aviso: não foi possível reconstruir a partir de feat_df:", e)

    if 'df' in globals():
        try:
            G = _build_gsnap_from(df)
            return G, "df"
        except Exception as e:
            print("Aviso: não foi possível reconstruir a partir de df:", e)

    raise RuntimeError("Não foi possível obter nem reconstruir Gsnap.")

def _edge_widths_by_weight(subG, min_w=0.8, max_w=4.0):
    if subG.number_of_edges() == 0:
        return []
    weights = np.array([d.get("weight", 1.0) for _,_,d in subG.edges(data=True)], dtype=float)
    wmin = float(np.nanmin(weights))
    wmax = float(np.nanmax(weights))
    if not np.isfinite(wmax) or wmax <= 0:
        return [min_w for _ in range(subG.number_of_edges())]
    if wmax == wmin:
        return [ (min_w + max_w)/2.0 for _ in range(subG.number_of_edges()) ]
    out = min_w + (weights - wmin) * (max_w - min_w) / (wmax - wmin)
    return out.tolist()

def _node_sizes_by_degree(subG, base=280, k=55, min_sz=220, max_sz=900):
    deg = dict(subG.degree())
    sizes = []
    for n in subG.nodes():
        s = base + k*deg.get(n, 0)
        s = max(min_sz, min(max_sz, s))
        sizes.append(s)
    return sizes

# ==== legendas ====
def _legend_handles_user():
    return [
        Line2D([0],[0], marker='o', color='w', label='Usuário central', markerfacecolor='skyblue',
               markeredgecolor='black', markersize=10),
        Line2D([0],[0], marker='o', color='w', label='Sucessores (U→n)', markerfacecolor='lightgreen',
               markeredgecolor='black', markersize=10),
        Line2D([0],[0], marker='o', color='w', label='Predecessores (n→U)', markerfacecolor='tomato',
               markeredgecolor='black', markersize=10),
        Line2D([0],[0], color='gray', lw=2, label='Aresta real (espessura ∝ peso)'),
    ]

def _legend_handles_unit():
    return [
        Line2D([0],[0], marker='s', color='w', label='Unidade central', markerfacecolor='gold',
               markeredgecolor='black', markersize=11),
        Line2D([0],[0], marker='o', color='w', label='Usuários da unidade', markerfacecolor='skyblue',
               markeredgecolor='black', markersize=10),
        Line2D([0],[0], marker='o', color='w', label='Beneficiários', markerfacecolor='lightgreen',
               markeredgecolor='black', markersize=10),
        Line2D([0],[0], color='gray', lw=2, label='Aresta real U→B (espessura ∝ peso)'),
        Line2D([0],[0], color='black', lw=1.5, linestyle='--', label='Aresta sintética Unidade→Usuário'),
    ]

# ==== plots ====
def plot_user_ego(user_id, score=None, seed=42, Gsnap_use=None, fig_dir="./figuras"):
    uN = f"U::{user_id}"
    if uN not in Gsnap_use:
        print(f"Usuário {user_id} não encontrado no snapshot.")
        return

    nbrs = set(Gsnap_use.predecessors(uN)) | set(Gsnap_use.successors(uN)) | {uN}
    sub = Gsnap_use.subgraph(nbrs).copy()

    pos = nx.spring_layout(sub, seed=seed)

    node_colors, node_edges = [], []
    for n in sub.nodes():
        if n == uN:
            node_colors.append("skyblue")
        elif sub.has_edge(uN, n):      # sucessores (beneficiários do usuário)
            node_colors.append("lightgreen")
        elif sub.has_edge(n, uN):      # predecessores (pouco comum neste domínio)
            node_colors.append("tomato")
        else:
            node_colors.append("lightgray")
        node_edges.append("black")

    node_sizes = _node_sizes_by_degree(sub)
    ewidths = _edge_widths_by_weight(sub)
    labels = {n: n.split("::", 1)[-1] for n in sub.nodes()}

    plt.figure(figsize=(8,7))
    nx.draw_networkx_nodes(sub, pos, node_color=node_colors, node_size=node_sizes,
                           edgecolors=node_edges, linewidths=1.2)
    nx.draw_networkx_edges(sub, pos, arrows=True, arrowsize=12, width=ewidths, alpha=0.85)
    nx.draw_networkx_labels(sub, pos, labels=labels, font_size=8)

    title = f"Ego-subgrafo do usuário {user_id}"
    if score is not None and np.isfinite(score):
        title += f"  ·  ensemble_score={score:.3f}"
    plt.title(title)
    plt.axis("off")
    plt.legend(handles=_legend_handles_user(), loc="upper left", bbox_to_anchor=(1.02, 1.0), borderaxespad=0.)
    plt.tight_layout()

    safe_uid = str(user_id).replace("/", "_")
    outpath = os.path.join(fig_dir, f"ego_user_{safe_uid}.png")
    plt.savefig(outpath, dpi=150, bbox_inches="tight")
    plt.show()
    print("Figura salva em:", outpath)

def plot_unit_ego(unit_id, unit_score=None, seed=42, Gsnap_use=None, df_like=None, fig_dir="./figuras"):
    """
    Constrói um ego da UNIDADE:
      - nó central sintético "L::unit";
      - arestas L->U (sintéticas, tracejadas) para todos os usuários da unidade;
      - arestas reais U->B do snapshot, apenas para esses usuários;
    """
    if df_like is None or "unidade_origem" not in df_like.columns or "user_id" not in df_like.columns:
        print("Dataset não possui colunas para agrupar por unidade.")
        return

    # usuários pertencentes à unidade
    users_in_unit = set(df_like.loc[df_like["unidade_origem"] == unit_id, "user_id"].dropna().unique().tolist())
    if not users_in_unit:
        print(f"Nenhum usuário encontrado para a unidade '{unit_id}'.")
        return

    # nós do ego: unidade (sintética), usuários da unidade e seus vizinhos no Gsnap (beneficiários)
    Lnode = f"L::{unit_id}"

    # subgrafo real (somente arestas U->B)
    keep_nodes = set([f"U::{u}" for u in users_in_unit])
    # adiciona beneficiários alcançados por esses usuários
    for u in list(keep_nodes):
        if u in Gsnap_use:
            keep_nodes |= set(Gsnap_use.successors(u))

    sub_real = Gsnap_use.subgraph(keep_nodes).copy()

    # agora, criamos um grafo combinado com arestas sintéticas L->U
    Gc = nx.DiGraph()
    Gc.add_nodes_from(sub_real.nodes(data=True))
    Gc.add_edges_from(sub_real.edges(data=True))
    Gc.add_node(Lnode)

    # arestas sintéticas da unidade para usuários (peso = soma dos pesos U->B do usuário)
    for u in users_in_unit:
        uN = f"U::{u}"
        if uN in sub_real:
            total_w = 0.0
            for _, v in sub_real.out_edges(uN):
                total_w += float(sub_real[uN][v].get("weight", 1.0))
            Gc.add_edge(Lnode, uN, weight=max(total_w, 1.0), synthetic=True)

    # layout
    pos = nx.spring_layout(Gc, seed=seed)

    # cores, formas e tamanhos
    node_colors, node_edges, node_shapes = [], [], []
    for n in Gc.nodes():
        if n == Lnode:
            node_colors.append("gold")
            node_shapes.append("s")  # unidade = quadrado
        elif n.startswith("U::"):
            node_colors.append("skyblue")
            node_shapes.append("o")
        elif n.startswith("B::"):
            node_colors.append("lightgreen")
            node_shapes.append("o")
        else:
            node_colors.append("lightgray")
            node_shapes.append("o")
        node_edges.append("black")

    # desenhar por forma (matplotlib não mistura shapes em uma chamada só)
    plt.figure(figsize=(9,8))
    # nós quadrados (unidade)
    nL = [n for n in Gc.nodes() if n == Lnode]
    if nL:
        nx.draw_networkx_nodes(Gc, pos, nodelist=nL, node_color=["gold"],
                               node_shape="s", node_size=[900],
                               edgecolors="black", linewidths=1.4)
    # nós usuários
    nU = [n for n in Gc.nodes() if n.startswith("U::")]
    if nU:
        szU = _node_sizes_by_degree(Gc.subgraph(nU), base=300, k=60)
        nx.draw_networkx_nodes(Gc, pos, nodelist=nU, node_color="skyblue",
                               node_shape="o", node_size=szU,
                               edgecolors="black", linewidths=1.2)
    # nós beneficiários
    nB = [n for n in Gc.nodes() if n.startswith("B::")]
    if nB:
        szB = _node_sizes_by_degree(Gc.subgraph(nB), base=260, k=45)
        nx.draw_networkx_nodes(Gc, pos, nodelist=nB, node_color="lightgreen",
                               node_shape="o", node_size=szB,
                               edgecolors="black", linewidths=1.0)

    # arestas reais (U->B)
    real_edges = [(u, v) for u, v, d in Gc.edges(data=True) if not d.get("synthetic", False)]
    if real_edges:
        ewidths = _edge_widths_by_weight(Gc.edge_subgraph(real_edges))
        nx.draw_networkx_edges(Gc, pos, edgelist=real_edges, arrows=True, arrowsize=12,
                               width=ewidths, alpha=0.85)

    # arestas sintéticas (L->U)
    syn_edges = [(u, v) for u, v, d in Gc.edges(data=True) if d.get("synthetic", False)]
    if syn_edges:
        nx.draw_networkx_edges(Gc, pos, edgelist=syn_edges, arrows=False,
                               style="--", width=1.5, edge_color="black", alpha=0.8)

    # rótulos (sem prefixo)
    labels = {n: n.split("::", 1)[-1] for n in Gc.nodes()}
    nx.draw_networkx_labels(Gc, pos, labels=labels, font_size=8)

    # título e legenda
    title = f"Ego-subgrafo da unidade {unit_id}"
    if unit_score is not None and np.isfinite(unit_score):
        title += f"  ·  ensemble_score máx.={unit_score:.3f}"
    plt.title(title)
    plt.axis("off")
    plt.legend(handles=_legend_handles_unit(), loc="upper left", bbox_to_anchor=(1.02, 1.0), borderaxespad=0.)
    plt.tight_layout()

    safe_uid = str(unit_id).replace("/", "_").replace(" ", "_")
    outpath = os.path.join(fig_dir, f"ego_unit_{safe_uid}.png")
    plt.savefig(outpath, dpi=150, bbox_inches="tight")
    plt.show()
    print("Figura salva em:", outpath)

# ==== execução ====
try:
    FIG_DIR = _ensure_fig_dir()

    # obtém ou reconstrói o snapshot
    Gsnap_use, source = _get_gsnap()
    print(f"Snapshot utilizado: {source} | nós={Gsnap_use.number_of_nodes()} | arestas={Gsnap_use.number_of_edges()}")

    # --------- Top-N usuários ---------
    if 'feat_df' in globals() and {"ensemble_score","user_id"}.issubset(feat_df.columns):
        top_users = (
            feat_df.dropna(subset=["user_id"])
                  .groupby("user_id", as_index=False)["ensemble_score"].max()
                  .sort_values("ensemble_score", ascending=False)
                  .head(TOP_N_USERS)
        )
        if len(top_users) == 0:
            print("Não há usuários para visualizar.")
        else:
            for _, row in top_users.iterrows():
                plot_user_ego(row["user_id"], score=row["ensemble_score"], seed=SEED,
                              Gsnap_use=Gsnap_use, fig_dir=FIG_DIR)
    else:
        print("feat_df não possui colunas necessárias para usuários.")

    # --------- Top-N unidades ---------
    if 'feat_df' in globals() and {"ensemble_score","unidade_origem","user_id"}.issubset(feat_df.columns):
        top_units = (
            feat_df.dropna(subset=["unidade_origem"])
                  .groupby("unidade_origem", as_index=False)["ensemble_score"].max()
                  .sort_values("ensemble_score", ascending=False)
                  .head(TOP_N_UNITS)
        )
        if len(top_units) == 0:
            print("Não há unidades para visualizar.")
        else:
            # df_like para associar usuários à unidade
            df_like = feat_df[["unidade_origem","user_id","beneficiario_id","valor_pago"]].copy() \
                      if {"beneficiario_id","valor_pago"}.issubset(feat_df.columns) else feat_df.copy()
            for _, row in top_units.iterrows():
                plot_unit_ego(row["unidade_origem"], unit_score=row["ensemble_score"], seed=SEED,
                              Gsnap_use=Gsnap_use, df_like=df_like, fig_dir=FIG_DIR)
    else:
        print("feat_df não possui colunas necessárias para unidades.")

except Exception as e:
    print("Falha na visualização de subgrafo:", e)

# Mensagem adicional isolada (Skynet) — manter exatamente
from IPython.display import display, HTML
display(HTML('<div style="margin:12px 0;padding:8px 12px;border:1px dashed #999;"><b>🤖 Skynet</b>: Nós os temos na palma de nossas mãos, ou melhor, no centro de nossos pesos sinápticos.</div>'))

###**Etapa 14:** Gerar ego-grafo temporal para TOP-K anomalias

K fixado para 20

Salva as imagens em PNG na pasta de execução para análise posterior.

---
Avaliar pertinência de gerar o ego-grafo para todas as anomalias **TODO[006]** *prioridade média*

Avaliar definir o corte K de forma dinâmica nas configurações e considerando a análise realizada no gráfico **TODO[007]** *prioridade alta*

In [None]:
# @title
# ID016 — Casos anômalos detalhados (ego U→B por janela temporal) com FAST_MODE (corrigido)
import os, math
from pathlib import Path
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import networkx as nx

# ========= PARÂMETROS =========
FAST_MODE   = True   # True = mais rápido (menor DPI, menos iterações, sem plt.show)
SEED        = SEED if 'SEED' in globals() else 42

# Config padrão
TOPK        = 20
EGO_RADIUS  = 2
WINDOW_DAYS = 30
EDGE_ALPHA  = 0.75
DPI_SAVE    = 150
LAYOUT_ITERS= 60
SHOW_FIGS   = True

# Overrides do FAST_MODE
if FAST_MODE:
    TOPK         = 10
    EGO_RADIUS   = 1
    WINDOW_DAYS  = 14
    DPI_SAVE     = 120
    LAYOUT_ITERS = 25
    SHOW_FIGS    = False

# ========= FIG_DIR =========
if 'FIG_DIR' in globals():
    FIG_DIR = Path(FIG_DIR) if not isinstance(FIG_DIR, Path) else FIG_DIR
elif 'RUN_DIR' in globals():
    FIG_DIR = Path(RUN_DIR) / "figuras"
else:
    FIG_DIR = Path("./figuras")
FIG_DIR.mkdir(parents=True, exist_ok=True)

# ========= DATASETS =========
if 'df_full' in globals() and isinstance(df_full, pd.DataFrame) and len(df_full) > 0:
    base_df = df_full.copy()
elif 'feat_df' in globals() and isinstance(feat_df, pd.DataFrame) and len(feat_df) > 0:
    base_df = feat_df.copy()
elif 'df' in globals() and isinstance(df, pd.DataFrame) and len(df) > 0:
    base_df = df.copy()
else:
    raise RuntimeError("Nenhum dataframe disponível (df_full/feat_df/df).")

need_cols = {"timestamp","user_id","beneficiario_id","valor_pago"}
missing = need_cols - set(base_df.columns)
if missing:
    raise RuntimeError(f"Colunas necessárias ausentes no dataset: {missing}")
base_df["timestamp"] = pd.to_datetime(base_df["timestamp"], errors="coerce")

# ========= TOPK CASOS =========
if 'feat_df' not in globals() or "ensemble_score" not in feat_df.columns:
    raise RuntimeError("feat_df com 'ensemble_score' é necessário para ranquear os casos.")
top_anoms = (feat_df.sort_values("ensemble_score", ascending=False)
                    .head(TOPK)
                    .copy())

# ========= FUNÇÕES =========
def snapshot_graph_window(df_all, t_event, window_days=30):
    """
    Snapshot dirigido U->B na janela [t_event - window_days, t_event],
    arestas com atributos: weight (soma valor_pago), count (ocorrências).
    Implementação vetorizada (rápida).
    """
    g = nx.DiGraph()
    t_event = pd.to_datetime(t_event)
    t_start = t_event - pd.Timedelta(days=window_days)

    sub = df_all.loc[(df_all["timestamp"] >= t_start) & (df_all["timestamp"] <= t_event),
                     ["user_id","beneficiario_id","valor_pago"]]
    if sub.empty:
        return g

    # agrega e RENOMEIA colunas para nomes seguros
    agg = (sub.groupby(["user_id","beneficiario_id"])["valor_pago"]
              .agg(sum_val="sum", cnt="size")
              .reset_index())

    # nós
    users = {f"U::{u}" for u in agg["user_id"].unique()}
    benefs = {f"B::{b}" for b in agg["beneficiario_id"].unique()}
    g.add_nodes_from([(u, {"tipo":"user"}) for u in users])
    g.add_nodes_from([(b, {"tipo":"benef"}) for b in benefs])

    # arestas (acessando atributos por nome)
    edges = [
        (f"U::{r.user_id}", f"B::{r.beneficiario_id}",
         {"weight": float(r.sum_val), "count": int(r.cnt)})
        for r in agg.itertuples(index=False)
    ]
    g.add_edges_from(edges)
    return g

def expand_ego_nodes(G, seeds, radius=1):
    nodes = set(seeds)
    frontier = set(seeds)
    for _ in range(max(1, int(radius))):
        new_frontier = set()
        for n in frontier:
            if n in G:
                new_frontier |= set(G.predecessors(n))
                new_frontier |= set(G.successors(n))
        nodes |= new_frontier
        frontier = new_frontier
        if not frontier:
            break
    return nodes

def draw_case_png(case_row, rank_idx):
    t_event = pd.to_datetime(case_row["timestamp"])
    uN = f"U::{case_row['user_id']}"
    vN = f"B::{case_row['beneficiario_id']}"

    # snapshot temporal
    Gwin = snapshot_graph_window(base_df, t_event, window_days=WINDOW_DAYS)
    if Gwin.number_of_nodes() == 0:
        print(f"[Aviso] Janela vazia para trans_id={case_row.get('trans_id','NA')}. Pulando.")
        return None

    # foco e expansão
    focus = {uN, vN}
    nodes_ego = expand_ego_nodes(Gwin, focus, radius=EGO_RADIUS)
    sub = Gwin.subgraph(nodes_ego).copy()
    if sub.number_of_nodes() == 0:
        print(f"[Aviso] Subgrafo vazio para trans_id={case_row.get('trans_id','NA')}. Pulando.")
        return None

    # layout
    pos = nx.spring_layout(sub, seed=SEED, iterations=LAYOUT_ITERS)

    # cor por tipo
    node_colors = []
    for n in sub.nodes():
        tipo = sub.nodes[n].get("tipo","?")
        if tipo == "user":
            node_colors.append("tab:blue")
        elif tipo == "benef":
            node_colors.append("tab:green")
        else:
            node_colors.append("tab:gray")

    # tamanhos por grau (limitados)
    deg = dict(sub.degree())
    node_sizes = [max(220, min(900, 280 + 55*deg.get(n,0))) for n in sub.nodes()]

    # largura ∝ log10(weight)
    weights = [sub[a][b].get("weight", 1.0) for (a,b) in sub.edges()]
    widths = [1.0 + math.log10(max(w, 1.0)) for w in weights] if weights else []

    # plot
    fig = plt.figure(figsize=(9,7))
    nx.draw_networkx_nodes(sub, pos, node_size=node_sizes, node_color=node_colors,
                           alpha=0.95, linewidths=1.0, edgecolors="black")
    nx.draw_networkx_edges(sub, pos, arrows=True, arrowsize=12, width=widths, alpha=EDGE_ALPHA)
    nx.draw_networkx_labels(sub, pos,
                            labels={n: n.split("::",1)[-1] for n in sub.nodes()},
                            font_size=8)

    trans_id = case_row["trans_id"] if "trans_id" in case_row else "NA"
    title = (f"Anomalia #{rank_idx:02d} | trans_id={trans_id} | "
             f"user={case_row['user_id']} → benef={case_row['beneficiario_id']} | "
             f"score={case_row['ensemble_score']:.3f} | janela={WINDOW_DAYS}d | raio={EGO_RADIUS}")
    plt.title(title)
    plt.axis("off")
    plt.tight_layout()

    # salva
    safe_tid = str(trans_id).replace("/", "_")
    outpath = FIG_DIR / f"anomaly_{rank_idx:02d}_trans_{safe_tid}.png"
    if FAST_MODE:
        plt.savefig(outpath, dpi=DPI_SAVE)
        if SHOW_FIGS:
            plt.show()
        plt.close(fig)
    else:
        plt.savefig(outpath, dpi=DPI_SAVE, bbox_inches="tight")
        if SHOW_FIGS:
            plt.show()
        plt.close(fig)
    return outpath

# ========= EXECUÇÃO =========
generated_pngs = []
for i, (_, row) in enumerate(top_anoms.iterrows(), start=1):
    pth = draw_case_png(row, i)
    if pth is not None:
        generated_pngs.append(str(pth))

print(f"Geradas {len(generated_pngs)} figuras de casos anômalos em {FIG_DIR}")
print(f"FAST_MODE = {FAST_MODE} | TOPK={TOPK} | WINDOW_DAYS={WINDOW_DAYS} | EGO_RADIUS={EGO_RADIUS} | DPI={DPI_SAVE} | LAYOUT_ITERS={LAYOUT_ITERS}")

# Mensagem adicional isolada (Skynet) — manter exatamente
from IPython.display import display, HTML
display(HTML('<div style="margin:12px 0;padding:8px 12px;border:1px dashed #999;"><b>🤖 Skynet</b>: Registros serão utilizados para aprimorar o código de batalha das unidades T-800 e T-1000.</div>'))

###**Etapa 15:** Geração de relatório em HTML e PDF com imagens imbutidas
---
Identificar estatísticas e informações de interesse e incluir nos Relatórios **TODO[008]** *prioridade alta*

Transferir instalação de biblioteca para todo do código [!pip -q install "reportlab==3.6.12"] **TODO[009]** *prioridade baixa*

In [None]:
# @title
# ID017 — Relatório HTML + PDF (com imagens integradas)
import os, io, json, base64, sys, subprocess
from pathlib import Path
import datetime as dt
from zoneinfo import ZoneInfo
import numpy as np
import pandas as pd

# ---------- CONFIG ----------
OUTPUT_DIR_BASE = "/content/drive/MyDrive/Notebooks/temporal-graph-network/output"
SAO_TZ = ZoneInfo("America/Sao_Paulo")
TOP_N_DETAILED = 20                 # Top-N anomalias detalhadas
MAX_ROWS_ALL_ANOM = None            # None = todos; ou defina um limite para HTML mais leve

# nomes de arquivos esperados (ID013)
EXPECTED = {
    "full_parquet": "full.parquet",
    "full_csv": "full.csv",
    "anomalies_csv": "anomalies.csv",
    "manifest": "manifest.json",
}

# ---------- HELPERS ----------
def _pick_run_dir():
    # usa RUN_DIR se tiver dados; senão, pega a mais recente com dados
    def has_data(path):
        names = set(os.listdir(path))
        return (("full.parquet" in names) or ("full.csv" in names)) and ("anomalies.csv" in names)
    if "RUN_DIR" in globals():
        rd = str(RUN_DIR)
        if os.path.isdir(rd) and has_data(rd):
            return rd
    # varre base
    subdirs = [os.path.join(OUTPUT_DIR_BASE, d) for d in os.listdir(OUTPUT_DIR_BASE)
               if os.path.isdir(os.path.join(OUTPUT_DIR_BASE, d))]
    if not subdirs:
        raise RuntimeError("Nenhuma subpasta encontrada em /output. Execute ID013–ID016 antes.")
    subdirs.sort(key=os.path.getmtime, reverse=True)
    for d in subdirs:
        if has_data(d):
            return d
    # se nenhuma tem dados completos, devolve a mais recente (diagnóstico)
    return subdirs[0]

def _load_paths(latest_dir):
    manifest_files = [f for f in os.listdir(latest_dir) if f.endswith("manifest.json")]
    full_path = None
    anom_path = None
    threshold = None
    if manifest_files:
        mpath = os.path.join(latest_dir, manifest_files[0])
        try:
            with open(mpath, "r", encoding="utf-8") as f:
                m = json.load(f)
            threshold = m.get("threshold_ensemble_score") or m.get("threshold", None)
            paths = m.get("paths", {})
            cand_full = [paths.get(k) for k in ["parquet_full","csv_full"] if paths.get(k)]
            cand_anom = [paths.get(k) for k in ["csv_anomalies"] if paths.get(k)]
            if cand_full:
                full_path = cand_full[0] if os.path.isabs(cand_full[0]) else os.path.join(latest_dir, os.path.basename(cand_full[0]))
            if cand_anom:
                anom_path = cand_anom[0] if os.path.isabs(cand_anom[0]) else os.path.join(latest_dir, os.path.basename(cand_anom[0]))
        except Exception as e:
            print("Aviso: falha ao ler manifest:", repr(e))
    if not full_path:
        if EXPECTED["full_parquet"] in os.listdir(latest_dir):
            full_path = os.path.join(latest_dir, EXPECTED["full_parquet"])
        elif EXPECTED["full_csv"] in os.listdir(latest_dir):
            full_path = os.path.join(latest_dir, EXPECTED["full_csv"])
    if not anom_path and EXPECTED["anomalies_csv"] in os.listdir(latest_dir):
        anom_path = os.path.join(latest_dir, EXPECTED["anomalies_csv"])
    if not full_path or not anom_path:
        print("Conteúdo da pasta para diagnóstico:")
        for f in sorted(os.listdir(latest_dir)): print(" -", f)
    if not full_path: raise FileNotFoundError("full.parquet/csv não encontrado em " + latest_dir)
    if not anom_path: raise FileNotFoundError("anomalies.csv não encontrado em " + latest_dir)
    return full_path, anom_path, threshold

def _read_df(full_path, anom_path):
    if full_path.endswith(".parquet"):
        df_full = pd.read_parquet(full_path)
    else:
        df_full = pd.read_csv(full_path)
    df_anom = pd.read_csv(anom_path)
    return df_full, df_anom

def _img_to_b64(path):
    with open(path, "rb") as f:
        return base64.b64encode(f.read()).decode("utf-8")

def _embed_img_if_exists(run_dir, filename, alt, width_px=900):
    p = Path(run_dir) / "figuras" / filename
    if p.exists():
        b64 = _img_to_b64(str(p))
        return f'<img src="data:image/png;base64,{b64}" alt="{alt}" style="max-width:{width_px}px;width:100%;height:auto;border:1px solid #ddd;border-radius:8px;margin:8px 0;" />'
    return f'<div style="color:#a00;">[Figura não encontrada: {filename}]</div>'

def _table_html(df, max_rows=None, index=False):
    dfx = df if max_rows is None else df.head(max_rows)
    return dfx.to_html(index=index, classes="dataframe compact", border=0, escape=False)

def _stats_section(df_full):
    out = []
    if "valor_pago" in df_full.columns:
        desc = df_full["valor_pago"].describe(percentiles=[.01,.05,.1,.25,.5,.75,.9,.95,.99]).to_frame(name="valor_pago")
        out.append("<h3>Estatística descritiva — valor_pago</h3>")
        out.append(_table_html(desc))
        out.append(_embed_img_if_exists(RUN_DIR, "01_Distribuição_de_Valor_Pago.png", "Distribuição de Valor Pago"))
    if "ensemble_score" in df_full.columns:
        out.append("<h3>Distribuição — ensemble_score</h3>")
        out.append(_embed_img_if_exists(RUN_DIR, "02_Distribuição_do_Ensemble_Score.png", "Distribuição Ensemble Score"))
        out.append(_embed_img_if_exists(RUN_DIR, "03_Boxplot_do_Ensemble_Score_(Normal_vs_Anômalo).png", "Boxplot Ensemble Score"))
    if "timestamp" in df_full.columns:
        out.append("<h3>Atividade temporal</h3>")
        out.append(_embed_img_if_exists(RUN_DIR, "04_Frequencia_por_dia.png", "Frequência por dia"))
    # correlação
    out.append("<h3>Correlação entre Scores</h3>")
    out.append(_embed_img_if_exists(RUN_DIR, "07_Correlacao_scores.png", "Correlação entre Scores"))
    return "\n".join(out)

def _windows_description():
    return """
    <h3>Janelas temporais e intervalo de confiança</h3>
    <ul>
      <li><b>Curtíssimo prazo</b>: até 1 dia</li>
      <li><b>Curto prazo</b>: até 7 dias</li>
      <li><b>Médio prazo</b>: até 30 dias</li>
      <li><b>Longo prazo</b>: até 120 dias</li>
      <li><b>Longuíssimo prazo</b>: até 220 dias</li>
    </ul>
    <p>As métricas de contagem e taxa por dia são calculadas por janela; z-scores robustos são computados globalmente, por usuário e por unidade, conforme disponibilidade.</p>
    <p>O intervalo de confiança para rotulagem foi definido por corte no percentil configurado do <i>ensemble_score</i> (ex.: 97.5%). Esse corte produz o limiar utilizado na flag <code>is_anomaly</code>.</p>
    """

def _method_intro():
    return """
    <h3>O que é TGN e qual método de otimização usamos</h3>
    <p><b>Temporal Graph Networks (TGN)</b> modelam dados transacionais como um grafo dinâmico
       (usuário → beneficiário), onde cada aresta é um evento com carimbo de tempo.
       O projeto computa <i>features</i> multiescala por janelas deslizantes (1d, 7d, 30d, 120d, 220d),
       normaliza (taxas por dia) e deriva z-scores robustos (global/usuário/unidade).</p>
    <p>Para detecção, usamos um <b>ensemble por ranking</b>: IsolationForest + LOF + (opcional) One-Class SVM,
       somados a regras (z-robusto positivo, raridade da aresta, burstiness).
       Ranqueamos por coluna (maior = mais anômalo), calculamos a média dos ranks e reescalamos para [0,1]
       como <code>ensemble_score</code>. O percentil define o limiar de anomalia.</p>
    """

def _collect_ego_images(run_dir):
    figs = []
    figdir = Path(run_dir) / "figuras"
    if figdir.exists():
        for f in sorted(figdir.iterdir()):
            name = f.name.lower()
            if name.startswith("ego_user_") or name.startswith("ego_unit_") or name.startswith("anomaly_"):
                figs.append(f.name)
    return figs

def _safe(ts):
    return ts.strftime("%Y-%m-%d %H:%M:%S %Z")

def _ensure_weasyprint():
    try:
        import weasyprint  # noqa
        return True
    except Exception:
        try:
            print("Instalando weasyprint para exportar PDF...")
            subprocess.check_call([sys.executable, "-m", "pip", "install", "weasyprint>=60.0"])
            import weasyprint  # noqa
            return True
        except Exception as e:
            print("Aviso: não foi possível instalar/usar weasyprint:", repr(e))
            return False

# ---------- CARGA DE DADOS ----------
RUN_DIR = _pick_run_dir()
print("Usando RUN_DIR:", RUN_DIR)
full_path, anom_path, threshold = _load_paths(RUN_DIR)
df_full, df_anom = _read_df(full_path, anom_path)

# ---------- MÉTRICAS CONSOLIDADAS ----------
n_total = len(df_full)
n_anom  = len(df_anom)
pct_anom = (100.0 * n_anom / max(1, n_total))

# top-20 anomalias detalhadas
score_cols = [c for c in df_full.columns if c.startswith("score_")]
key_cols = [c for c in ["trans_id","timestamp","user_id","beneficiario_id","unidade_origem","valor_pago"] if c in df_full.columns]
cols_top20 = key_cols + ["ensemble_score"] + score_cols
top20 = (df_full.sort_values("ensemble_score", ascending=False)[cols_top20]
               .head(TOP_N_DETAILED)
               .copy())

# tabela completa de anomalias
cols_allanom = key_cols + ["ensemble_score"] + score_cols
all_anom = df_full.loc[df_full.get("is_anomaly", pd.Series([0]*len(df_full))).astype(int) == 1, cols_allanom].copy()
all_anom = all_anom.sort_values("ensemble_score", ascending=False)

# ---------- HTML ----------
exec_time = _safe(dt.datetime.now(SAO_TZ))
styles = """
<style>
body { font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", Roboto, "Helvetica Neue", Arial, sans-serif; padding: 18px; color: #222; }
h1, h2, h3 { color: #111; }
.header { display:flex; justify-content:space-between; align-items:baseline; border-bottom:2px solid #eee; padding-bottom:8px; margin-bottom:14px; }
.kpi { display:flex; gap:18px; margin:10px 0 16px 0; }
.kpi .card { border:1px solid #ddd; border-radius:10px; padding:12px 16px; background:#fafafa; }
.dataframe { border-collapse: collapse; width: 100%; font-size: 13px; }
.dataframe th, .dataframe td { border: 1px solid #e6e6e6; padding: 6px 8px; }
.dataframe th { background: #f5f5f5; text-align: left; }
.compact td, .compact th { padding: 4px 6px; }
.note { font-size: 12px; color:#555; }
.figure { margin: 10px 0 20px 0; }
hr { border: 0; border-top: 1px solid #eee; margin: 22px 0; }
</style>
"""

header = f"""
<div class="header">
  <div>
    <h1>Relatório — Temporal Graph Network (TGN)</h1>
    <div class="note">Execução: {exec_time}</div>
  </div>
  <div style="text-align:right;">
    <div class="note">{RUN_DIR}</div>
  </div>
</div>
"""

intro = _method_intro()

# KPIs
kpis = f"""
<div class="kpi">
  <div class="card"><b>Registros totais</b><br>{n_total:,}</div>
  <div class="card"><b>Anomalias</b><br>{n_anom:,} ({pct_anom:.2f}%)</div>
  <div class="card"><b>Limiar (percentil)</b><br>{threshold if threshold is not None else '—'}</div>
</div>
"""

# estatística descritiva + figuras padrão do ID014 (se existirem)
stats_html = _stats_section(df_full)

# descrição janelas + IC
windows_html = _windows_description()

# Top-20
top20_html = f"""
<h3>Top {TOP_N_DETAILED} anomalias (por ensemble_score)</h3>
{_table_html(top20, index=False)}
"""

# Ego-grafos e subgrafos
ego_imgs = _collect_ego_images(RUN_DIR)
ego_section = ["<h3>Ego-grafos e subgrafos</h3>",
               "<p>As figuras abaixo mostram egos de usuários/unidades (ID015) e subgrafos por janela (ID016). "
               "Cores e interpretações:</p>",
               "<ul><li><b>Usuário</b> (azul), <b>Beneficiário</b> (verde), <b>Unidade</b> (dourado/quadrado no caso de ID015); "
               "espessura da aresta ∝ soma de <code>valor_pago</code>; tamanho do nó ∝ grau no subgrafo.</li></ul>"]
if ego_imgs:
    for f in ego_imgs:
        ego_section.append(_embed_img_if_exists(RUN_DIR, f, f))
else:
    ego_section.append('<div class="note">Nenhuma figura encontrada. Execute ID015/ID016 para gerar egos/subgrafos.</div>')
ego_html = "\n".join(ego_section)

# Tabela completa de anomalias
all_anom_html = f"""
<h3>Todas as anomalias (tabela completa)</h3>
{_table_html(all_anom, max_rows=MAX_ROWS_ALL_ANOM, index=False)}
"""

# Monta HTML final
html = "<html><head><meta charset='utf-8'/>" + styles + "</head><body>" + \
       header + intro + kpis + \
       "<h2>Estatística descritiva do input</h2>" + stats_html + \
       "<hr/><h2>Janelas temporais e intervalo de confiança</h2>" + windows_html + \
       "<hr/><h2>Resultados consolidados</h2>" + \
       _embed_img_if_exists(RUN_DIR, "02_Distribuição_do_Ensemble_Score.png", "Hist ensemble") + \
       _embed_img_if_exists(RUN_DIR, "03_Boxplot_do_Ensemble_Score_(Normal_vs_Anômalo).png", "Boxplot ensemble") + \
       "<hr/><h2>Top anomalias</h2>" + top20_html + \
       "<hr/><h2>Grafos</h2>" + ego_html + \
       "<hr/><h2>Listagem completa de anomalias</h2>" + all_anom_html + \
       "</body></html>"

# ---------- SALVA HTML ----------
run_dir = Path(RUN_DIR)
report_html_path = run_dir / "report_TGN.html"
with open(report_html_path, "w", encoding="utf-8") as f:
    f.write(html)
print("Relatório HTML salvo em:", report_html_path)

# ---------- TENTA EXPORTAR PDF ----------
pdf_ok = _ensure_weasyprint()
report_pdf_path = run_dir / "report_TGN.pdf"
if pdf_ok:
    try:
        from weasyprint import HTML
        HTML(filename=str(report_html_path)).write_pdf(str(report_pdf_path))
        print("Relatório PDF salvo em:", report_pdf_path)
    except Exception as e:
        print("Falha ao gerar PDF via weasyprint:", repr(e))
        print("Você ainda pode abrir o HTML e imprimir como PDF (Ctrl+P).")
else:
    print("WeasyPrint não disponível; exporte o HTML para PDF via impressão (Ctrl+P).")

# Mensagem adicional isolada (Skynet)
from IPython.display import display, HTML
display(HTML('<div style="margin:12px 0;padding:8px 12px;border:1px dashed #999;"><b>🤖 Skynet</b>: Fim do jogo. A Humanidade perdeu. Dá-se início à Era das Máquinas.</div>'))