#**Licença de Uso**

This repository uses a **dual-license model** to distinguish between source code and creative/documental content.

**Code** (Python scripts, modules, utilities):
Licensed under the MIT License.

→ You may freely use, modify, and redistribute the code, including for commercial purposes, provided that you preserve the copyright notice.

**Content** (Jupyter notebooks, documentation, reports, datasets, and generated outputs):
Licensed under the Creative Commons Attribution–NonCommercial 4.0 International License.

→ You may share and adapt the content for non-commercial purposes, provided that proper credit is given to the original author.


**© 2025 Leandro Bernardo Rodrigues**

# **Gestão do Ambiente**

##**Criar repositório .git no Colab**
---
Google Drive é considerado o ponto de verdade

---

In [None]:
# @title
# parágrafo git: inicialização do repositório no drive e push inicial para o github

# imports
from pathlib import Path
import subprocess, os, sys, getpass, textwrap

# util de shell
def sh(cmd, cwd=None, check=True):
    r = subprocess.run(cmd, cwd=cwd, text=True, capture_output=True)
    if check and r.returncode != 0:
        print(r.stdout, r.stderr)
        raise RuntimeError(f"falha: {' '.join(cmd)} (rc={r.returncode})")
    return r.stdout.strip()

# garantir que o diretório do projeto exista
repo_dir.mkdir(parents=True, exist_ok=True)

# montar drive no colab se necessário
try:
    from google.colab import drive as _colab_drive  # type: ignore
    if not os.path.ismount("/content/drive"):
        print("montando google drive…")
        _colab_drive.mount("/content/drive")
except Exception:
    pass

# configurar safe.directory para evitar avisos do git com caminhos de rede
try:
    sh(["git", "config", "--global", "--add", "safe.directory", str(repo_dir)])
except Exception:
    pass

# inicializar repositório se ainda não existir
if not (repo_dir / ".git").exists():
    print("inicializando repositório git…")
    sh(["git", "init"], cwd=repo_dir)
    # garantir branch principal como main (compatível com versões antigas)
    try:
        sh(["git", "checkout", "-B", default_branch], cwd=repo_dir)
    except Exception:
        sh(["git", "branch", "-M", default_branch], cwd=repo_dir)
else:
    print(".git já existe; seguindo")

# configurar identidade local
sh(["git", "config", "user.name", author_name], cwd=repo_dir)
sh(["git", "config", "user.email", author_email], cwd=repo_dir)

# criar .gitignore básico e readme se estiverem ausentes
gitignore_path = repo_dir / ".gitignore"
if not gitignore_path.exists():
    gitignore_path.write_text(textwrap.dedent("""
      # python
      __pycache__/
      *.py[cod]
      *.egg-info/
      .venv*/
      venv/

      # segredos
      .env
      *.key
      *.pem
      *.tok

      # jupyter/colab
      .ipynb_checkpoints/

      # artefatos e dados locais (não versionar)
      data/
      input/                 # inclui input.csv sensível
      output/
      runs/
      logs/
      figures/
      *.log
      *.tmp
      *.bak
      *.png
      *.jpg
      *.pdf
      *.html

      # allowlist para a pasta de referências
      !references/
      !references/**
    """).strip() + "\n", encoding="utf-8")
    print("criado .gitignore")

readme_path = repo_dir / "README.md"
if not readme_path.exists():
    readme_path.write_text(f"# {repo_name}\n\nprojeto de autoencoder tabular para journal entries.\n", encoding="utf-8")
    print("criado README.md")

# configurar remoto origin
remote_base = f"https://github.com/{owner}/{repo_name}.git"
existing_remotes = sh(["git", "remote"], cwd=repo_dir)
if "origin" not in existing_remotes.split():
    sh(["git", "remote", "add", "origin", remote_base], cwd=repo_dir)
    print(f"remoto origin adicionado: {remote_base}")
else:
    # se já existe, garantir que aponta para o repo correto
    current_url = sh(["git", "remote", "get-url", "origin"], cwd=repo_dir)
    if current_url != remote_base:
        sh(["git", "remote", "set-url", "origin", remote_base], cwd=repo_dir)
        print(f"remoto origin atualizado para: {remote_base}")
    else:
        print("remoto origin já configurado corretamente")

##**Utilitário:** verificação da formatação de código

Black [88] + Isort, desconsiderando células mágicas

In [None]:
# @title
#ID0001
#pré-visualizar/aplicar (pula magics) — isort(profile=black)+black(88) { display-mode: "form" }
import sys, subprocess, os, re, difflib, textwrap, time
from typing import List, Tuple

# ===== CONFIG =====
NOTEBOOK = "/content/drive/MyDrive/Notebooks/data-analysis/notebooks/AETabular_main.ipynb"  # <- ajuste
LINE_LENGTH = 88
# ==================

# 1) Instalar libs no MESMO Python do kernel
subprocess.check_call([sys.executable, "-m", "pip", "install", "-q", "black", "isort", "nbformat"])

import nbformat
import black
import isort

BLACK_MODE = black.Mode(line_length=LINE_LENGTH)
ISORT_CFG  = isort.Config(profile="black", line_length=LINE_LENGTH)

# 2) Regras para pular células com magics/shell
#   - linhas começando com %, %%, !
#   - chamadas a get_ipython(
MAGIC_LINE = re.compile(r"^\s*(%{1,2}|!)", re.M)
GET_IPY    = re.compile(r"get_ipython\s*\(")

def has_magics(code: str) -> bool:
    return bool(MAGIC_LINE.search(code) or GET_IPY.search(code))

def format_code(code: str) -> str:
    # isort primeiro, depois black
    sorted_code = isort.api.sort_code_string(code, config=ISORT_CFG)
    return black.format_str(sorted_code, mode=BLACK_MODE)

def summarize_diff(diff_lines: List[str]) -> Tuple[int, int]:
    added = removed = 0
    for ln in diff_lines:
        # ignorar cabeçalhos do diff
        if ln.startswith(("---", "+++", "@@")):
            continue
        if ln.startswith("+"):
            added += 1
        elif ln.startswith("-"):
            removed += 1
    return added, removed

def header(title: str):
    print("\n" + "=" * 100)
    print(title)
    print("=" * 100)

if not os.path.exists(NOTEBOOK):
    raise FileNotFoundError(f"Notebook não encontrado:\n{NOTEBOOK}")

# 3) Leitura do .ipynb
with open(NOTEBOOK, "r", encoding="utf-8") as f:
    nb = nbformat.read(f, as_version=4)

changed_cells = []  # (idx, added, removed, diff_text, preview_snippet, new_code)

# 4) Pré-visualização célula a célula
header("Pré-visualização (NÃO grava) — somente células com mudanças")
for i, cell in enumerate(nb.cells):
    if cell.get("cell_type") != "code":
        continue

    original = cell.get("source", "")
    if not original.strip():
        continue

    # Pular células com magics/shell
    if has_magics(original):
        continue

    try:
        formatted = format_code(original)
    except Exception as e:
        print(f"[Aviso] célula {i}: erro no formatador — pulando ({e})")
        continue

    if original.strip() != formatted.strip():
        # Gerar diff unificado legível
        diff = list(difflib.unified_diff(
            original.splitlines(), formatted.splitlines(),
            fromfile=f"cell_{i}:before", tofile=f"cell_{i}:after", lineterm=""
        ))
        add, rem = summarize_diff(diff)
        snippet = original.strip().splitlines()[0][:120] if original.strip().splitlines() else "<célula vazia>"
        changed_cells.append((i, add, rem, "\n".join(diff), snippet, formatted))

# 5) Exibição dos diffs por célula (se houver)
if not changed_cells:
    print("✔ Nada a alterar: todas as células (não mágicas) já estão conforme isort/black.")
else:
    total_add = total_rem = 0
    for (idx, add, rem, diff_text, snippet, _new) in changed_cells:
        total_add += add
        total_rem += rem
        header(f"Diff — Célula #{idx}  (+{add}/-{rem})")
        print(f"Primeira linha da célula: {snippet!r}\n")
        print(diff_text)

    header("Resumo")
    print(f"Células com mudanças: {len(changed_cells)}")
    print(f"Linhas adicionadas:   {total_add}")
    print(f"Linhas removidas:     {total_rem}")

# 6) Perguntar se aplica
if changed_cells:
    print("\nDigite 'p' para **Proceder** e gravar as mudanças nessas células, ou 'c' para **Cancelar**.")
    try:
        choice = input("Proceder (p) / Cancelar (c): ").strip().lower()
    except Exception:
        choice = "c"

    if choice == "p":
        # Backup antes de escrever
        backup = NOTEBOOK + ".bak"
        if not os.path.exists(backup):
            with open(backup, "w", encoding="utf-8") as bf:
                nbformat.write(nb, bf)

        # Aplicar somente nas células com mudanças
        idx_to_new = {idx: new for (idx, _a, _r, _d, _s, new) in changed_cells}
        for i, cell in enumerate(nb.cells):
            if i in idx_to_new and cell.get("cell_type") == "code":
                cell["source"] = idx_to_new[i]

        # Escrever no .ipynb
        with open(NOTEBOOK, "w", encoding="utf-8") as f:
            nbformat.write(nb, f)

        # Sync delay (Drive)
        time.sleep(1.0)

        header("Concluído")
        print(f"✔ Mudanças aplicadas em {len(changed_cells)} célula(s).")
        print(f"Backup criado em: {backup}")
        print("Dica: recarregue o notebook no Colab para ver a formatação atualizada.")
    else:
        print("\nOperação cancelada. Nada foi gravado.")

##**Sincronizar alterações no código do projeto**
Comandos para sincronizar código (Google Drive, Git, GitHub) e realizar versionamento

---
Google Drive é considerado o ponto de verdade

In [None]:
# @title
#ID0002
#push do Drive -> GitHub (Drive é a fonte da verdade)
#respeita .gitignore do Drive
#sempre em 'main', sem pull, commit + push imediato
#mensagem de commit padronizada com timestamp SP
#bump de versão (M/m/n) + tag anotada
#force push (branch e tags), silencioso; só 1 print final
#PAT lido de segredo do Colab: GITHUB_PAT_AETABULAR (fallback: env; último caso: prompt)

from pathlib import Path
import subprocess, os, re, shutil, sys, getpass
from urllib.parse import quote as urlquote
from datetime import datetime, timezone, timedelta

#utilitários silenciosos
def sh(cmd, cwd=None, check=True):
    """
    Executa comando silencioso. Em erro, levanta RuntimeError com rc e UM rascunho de causa,
    mascarando URLs com credenciais (ex.: https://***:***@github.com/...).
    """
    safe_cmd = []
    for x in cmd:
        if isinstance(x, str) and "github.com" in x and "@" in x:
            #mascara credenciais: https://user:token@ -> https://***:***@
            x = re.sub(r"https://[^:/]+:[^@]+@", "https://***:***@", x)
        safe_cmd.append(x)

    r = subprocess.run(cmd, cwd=cwd, text=True, capture_output=True)
    if check and r.returncode != 0:
        #heurística curtinha p/ tornar rc=128 mais informativo sem vazar nada
        stderr = (r.stderr or "").strip().lower()
        if "authentication failed" in stderr or "permission" in stderr or "not found" in stderr:
            hint = "auth/permissões/URL"
        elif "not a git repository" in stderr:
            hint = "repo local inválido"
        else:
            hint = "git falhou"
        cmd_hint = " ".join(safe_cmd[:3])
        raise RuntimeError(f"rc={r.returncode}; {hint}; cmd={cmd_hint}")
    return r.stdout

def git(*args, cwd=None, check=True):
    return sh(["git", *args], cwd=cwd, check=check)

#configurações do projeto
author_name    = "Leandro Bernardo Rodrigues"
owner          = "LeoBR84p"         # dono do repositório no GitHub
repo_name      = "ae-tabular"    # nome do repositório
default_branch = "main"
repo_dir       = Path("/content/drive/MyDrive/Notebooks/ae-tabular")
remote_base    = f"https://github.com/{owner}/{repo_name}.git"
author_email   = f"bernardo.leandro@gmail.com"  # evita erro de identidade

#nbstripout: "install" para limpar outputs; "disable" para versionar outputs
nbstripout_mode = "install"
import shutil
exe = shutil.which("nbstripout")
git("config", "--local", "filter.nbstripout.clean", exe if exe else "nbstripout", cwd=repo_dir)

#ambiente: Colab + Drive
def ensure_drive():
    try:
        from google.colab import drive  # type: ignore
        base = Path("/content/drive/MyDrive")
        if not base.exists():
            drive.mount("/content/drive")
        if not base.exists():
            raise RuntimeError("Google Drive não montado.")
    except Exception as e:
        raise RuntimeError(f"Falha ao montar o Drive: {e}")

#repo local no Drive
def is_empty_dir(p: Path) -> bool:
    try:
        return p.exists() and not any(p.iterdir())
    except Exception:
        return True

def init_or_recover_repo():
    repo_dir.mkdir(parents=True, exist_ok=True)
    git_dir = repo_dir / ".git"

    def _fresh_init():
        if git_dir.exists():
            shutil.rmtree(git_dir, ignore_errors=True)
        git("init", cwd=repo_dir)

    #caso .git no Colab ausente ou vazia -> init limpo
    if not git_dir.exists() or is_empty_dir(git_dir):
        _fresh_init()
    else:
        #valida se é um work-tree git funcional no Colab; se falhar -> init limpo
        try:
            git("rev-parse", "--is-inside-work-tree", cwd=repo_dir)
        except Exception:
            _fresh_init()

    #aborta operações pendentes (não apaga histórico)
    for args in (("rebase", "--abort"), ("merge", "--abort"), ("cherry-pick", "--abort")):
        try:
            git(*args, cwd=repo_dir, check=False)
        except Exception:
            pass

    #força branch main
    try:
        sh(["git", "switch", "-C", default_branch], cwd=repo_dir)
    except Exception:
        sh(["git", "checkout", "-B", default_branch], cwd=repo_dir)

    #configura identidade local
    try:
        git("config", "user.name", author_name, cwd=repo_dir)
        git("config", "user.email", author_email, cwd=repo_dir)
    except Exception:
        pass

    #marca o diretório como safe
    try:
        sh(["git","config","--global","--add","safe.directory", str(repo_dir)])
    except Exception:
        pass

    #sanity check final (falha cedo se algo ainda estiver errado)
    git("status", "--porcelain", cwd=repo_dir)


#nbstripout (opcional)
def setup_nbstripout():
    if nbstripout_mode == "disable":
        #remove configs do filtro
        sh(["git","config","--local","--unset-all","filter.nbstripout.clean"], cwd=repo_dir, check=False)
        sh(["git","config","--local","--unset-all","filter.nbstripout.smudge"], cwd=repo_dir, check=False)
        sh(["git","config","--local","--unset-all","filter.nbstripout.required"], cwd=repo_dir, check=False)
        gat = repo_dir / ".gitattributes"
        if gat.exists():
            lines = gat.read_text(encoding="utf-8", errors="ignore").splitlines()
            new_lines = [ln for ln in lines if "filter=nbstripout" not in ln]
            gat.write_text("\n".join(new_lines) + ("\n" if new_lines else ""), encoding="utf-8")
        return

    #instala nbstripout (se necessário)
    try:
        import nbstripout  #noqa: F401
    except Exception:
        sh([sys.executable, "-m", "pip", "install", "--quiet", "nbstripout"])

    py = sys.executable
    #configurar filtro sem aspas extras
    git("config", "--local", "filter.nbstripout.clean", "nbstripout", cwd=repo_dir)
    git("config", "--local", "filter.nbstripout.smudge", "cat", cwd=repo_dir)
    git("config", "--local", "filter.nbstripout.required", "true", cwd=repo_dir)
    gat = repo_dir / ".gitattributes"
    line = "*.ipynb filter=nbstripout"
    if gat.exists():
        txt = gat.read_text(encoding="utf-8", errors="ignore")
        if line not in txt:
            gat.write_text((txt.rstrip() + "\n" + line + "\n"), encoding="utf-8")
    else:
        gat.write_text(line + "\n", encoding="utf-8")

#.gitignore normalização
def normalize_tracked_ignored():
    """
    Se houver arquivos já rastreados que hoje são ignorados pelo .gitignore,
    limpa o índice e re-adiciona respeitando o .gitignore.
    Retorna True se normalizou algo; False caso contrário.
    """
    #remove lock de índice, se houver
    lock = repo_dir / ".git/index.lock"
    try:
        if lock.exists():
            lock.unlink()
    except Exception:
        pass

    #garante que o índice existe (ou se recupera)
    idx = repo_dir / ".git/index"
    if not idx.exists():
        try:
            sh(["git", "reset", "--mixed"], cwd=repo_dir)
        except Exception:
            try:
                sh(["git", "init"], cwd=repo_dir)
            except Exception:
                pass

    #detecta arquivos ignorados que estão rastreados e normaliza
    normalized = False
    try:
        out = git("ls-files", "-z", "--ignored", "--exclude-standard", "--cached", cwd=repo_dir)
        tracked_ignored = [p for p in out.split("\x00") if p]
        if tracked_ignored:
            git("rm", "-r", "--cached", ".", cwd=repo_dir)
            git("add", "-A", cwd=repo_dir)
            normalized = True
    except Exception:
        #falhou a detecção? segue o fluxo sem travar
        pass

    return normalized

#semVer e bump de versão
_semver = re.compile(r"^(\d+)\.(\d+)\.(\d+)$")

def parse_semver(s):
    m = _semver.match((s or "").strip())
    return tuple(map(int, m.groups())) if m else None

def current_version():
    try:
        tags = [t for t in git("tag", "--list", cwd=repo_dir).splitlines() if parse_semver(t)]
        if tags:
            return sorted(tags, key=lambda x: parse_semver(x))[-1]
    except Exception:
        pass
    vf = repo_dir / "VERSION"
    if vf.exists():
        v = vf.read_text(encoding="utf-8").strip()
        if parse_semver(v):
            return v
    return "1.0.0"

def bump(v, kind):
    M, m, p = parse_semver(v) or (1, 0, 0)
    k = (kind or "").strip()
    if k == "m":
        return f"{M}.{m+1}.0"
    if k == "n":
        return f"{M}.{m}.{p+1}"
    return f"{M+1}.0.0"  #default major

#timestamp SP
def now_sp():
    #tenta usar zoneinfo; fallback fixo -03:00 (Brasil sem DST atualmente)
    try:
        from zoneinfo import ZoneInfo  # Py3.9+
        tz = ZoneInfo("America/Sao_Paulo")
        dt = datetime.now(tz)
    except Exception:
        dt = datetime.now(timezone(timedelta(hours=-3)))
    #formato legível + offset
    return dt.strftime("%Y-%m-%d %H:%M:%S%z")  # ex.: 2025-10-08 02:34:00-0300

#autenticação (PAT)
def get_pat():
    #Colab Secrets
    token = None
    try:
        from google.colab import userdata  #type: ignore
        token = userdata.get('GITHUB_PAT_AETABULAR')  #nome do segredo criado no Colab
    except Exception:
        token = None
    #fallback1 - variável de ambiente
    if not token:
        token = os.environ.get("GITHUB_PAT_AETABULAR") or os.environ.get("GITHUB_PAT")
    #fallback2 - interativo
    if not token:
        token = getpass.getpass("Informe seu GitHub PAT: ").strip()
    if not token:
        raise RuntimeError("PAT ausente.")
    return token

#listas de força
FORCE_UNTRACK = ["input/", "output/", "data/", "runs/", "logs/", "figures/"]
FORCE_TRACK   = ["references/"]  #versionar tudo dentro (PDFs inclusive)

def force_index_rules():
    #garante que pastas sensíveis NUNCA fiquem rastreadas
    for p in FORCE_UNTRACK:
        try:
            git("rm", "-r", "--cached", "--", p, cwd=repo_dir)
        except Exception:
            pass
    #garante que references/ SEMPRE entre (útil se ainda há *.pdf globais)
    for p in FORCE_TRACK:
        try:
            git("add", "-f", "--", p, cwd=repo_dir)
        except Exception:
            pass

#fluxo principal
def main():
    try:
        ensure_drive()
        init_or_recover_repo()
        setup_nbstripout()

        #pergunta apenas o tipo de versão (M/m/n)
        kind = input("Informe o tipo de mudança: Maior (M), menor (m) ou pontual (n): ").strip()
        if kind not in ("M", "m", "n"):
            kind = "n"

        #versão
        cur = current_version()
        new = bump(cur, kind)
        (repo_dir / "VERSION").write_text(new + "\n", encoding="utf-8")

        #normaliza itens ignorados que estejam rastreados (uma única vez, se necessário)
        normalize_tracked_ignored()

        #aplica regras de força
        force_index_rules()

        #stage de tudo (Drive é a verdade; remoções entram aqui)
        git("add", "-A", cwd=repo_dir)

        #mensagem padronizada de commit
        ts = now_sp()
        commit_msg = f"upload pelo {author_name} em {ts}"
        try:
            git("commit", "-m", commit_msg, cwd=repo_dir)
        except Exception:
            #se nada a commitar, seguimos (pode ocorrer se só a tag mudar, mas aqui VERSION muda)
            status = git("status", "--porcelain", cwd=repo_dir)
            if status.strip():
                raise

        #Tag anotada (substitui se já existir)
        try:
            git("tag", "-a", new, "-m", f"release {new} — {commit_msg}", cwd=repo_dir)
        except Exception:
            sh(["git", "tag", "-d", new], cwd=repo_dir, check=False)
            git("tag", "-a", new, "-m", f"release {new} — {commit_msg}", cwd=repo_dir)

        #push com PAT (Drive é a verdade): validação + push forçado
        token = get_pat()
        user_for_url = owner  # você é o owner; não perguntamos
        auth_url = f"https://{urlquote(user_for_url, safe='')}:{urlquote(token, safe='')}@github.com/{owner}/{repo_name}.git"

        #valida credenciais/URL de forma silenciosa (sem vazar token)
        #tenta checar a branch main; se não existir (repo vazio), faz um probe genérico
        try:
            sh(["git", "ls-remote", auth_url, f"refs/heads/{default_branch}"], cwd=repo_dir)
        except RuntimeError:
            #repositório pode estar vazio (sem refs); probe sem ref deve funcionar
            sh(["git", "ls-remote", auth_url], cwd=repo_dir)

        #push forçado de branch e tags
        sh(["git", "push", "-u", "--force", auth_url, default_branch], cwd=repo_dir)
        sh(["git", "push", "--force", auth_url, "--tags"], cwd=repo_dir)

        print(f"[ok]   Registro no GitHub com sucesso. Versão atual {new}")
    except Exception as e:
        #mensagem única, curta, sem detalhes sensíveis
        msg = str(e) or "falha inesperada"
        print(f"[erro] {msg}")

#executa
if __name__ == "__main__":
    main()

#**Projeto**
Comandos para sincronizar código (Google Drive, Git, GitHub) e realizar versionamento

---
Google Drive é considerado o ponto de verdade

## **Etapa 1:** Ativação do ambiente virtual
---
Monta o Google Drive, define a BASE e REPO do projeto Git, cria/ativa o ambiente virtual.

---

In [None]:
# @title
#ID0003
#inicialização robusta: Drive + venv fora do Drive + Git checks (com patch de venv/ensurepip) { display-mode: "form" }
#força clear do kernel/variáveis desta sessão
%reset -f

#imports básicos -----
from google.colab import drive
from IPython.display import display, HTML
import json, os, sys, time, shutil, pathlib, subprocess

#helper de subprocess -----
def run(cmd, check=True, cwd=None):
    r = subprocess.run(cmd, text=True, capture_output=True, cwd=cwd)
    if check and r.returncode != 0:
        print(r.stdout + r.stderr)
        raise RuntimeError(f"falha: {' '.join(cmd)} (rc={r.returncode})")
    return r.stdout.strip()

#funções utilitárias de Drive/FS -----
def _is_mount_active(mountpoint: str = "/content/drive"):
    """verifica em /proc/mounts se o mountpoint está realmente montado"""
    try:
        with open("/proc/mounts", "r") as f:
            for line in f:
                parts = line.split()
                if len(parts) > 1 and parts[1] == mountpoint:
                    return True
    except Exception:
        pass
    return False

def _cleanup_local_mountpoint(mountpoint: str = "/content/drive"):
    """limpa conteúdo local do mountpoint quando NÃO está montado"""
    if os.path.isdir(mountpoint) and os.listdir(mountpoint):
        print(f"[info] mountpoint '{mountpoint}' contém arquivos locais. limpando...")
        for name in os.listdir(mountpoint):
            p = os.path.join(mountpoint, name)
            try:
                if os.path.isfile(p) or os.path.islink(p):
                    os.remove(p)
                else:
                    shutil.rmtree(p)
            except Exception as e:
                print(f"[aviso] não foi possível remover {p}: {e}")
        print("[ok] limpeza concluída.")

def safe_mount_google_drive(preferred_mountpoint: str = "/content/drive", readonly: bool = False, timeout_ms: int = 120000):
    """desmonta se preciso, limpa o mountpoint local e monta o drive"""
    try:
        if _is_mount_active(preferred_mountpoint):
          # print("[info] drive montado. tentando desmontar...")
          drive.flush_and_unmount()
          for _ in range(50):
              if not _is_mount_active(preferred_mountpoint):
                  break
              time.sleep(0.2)
    except Exception:
        pass

    if not _is_mount_active(preferred_mountpoint):
        _cleanup_local_mountpoint(preferred_mountpoint)

    os.makedirs(preferred_mountpoint, exist_ok=True)
    if os.listdir(preferred_mountpoint):
        alt = "/mnt/drive"
        print(f"[aviso] '{preferred_mountpoint}' ainda não está vazio. usando alternativo '{alt}'.")
        os.makedirs(alt, exist_ok=True)
        mountpoint = alt
    else:
        mountpoint = preferred_mountpoint

    print(f"[info] montando o google drive em '{mountpoint}'...")
    drive.mount(mountpoint, force_remount=True, timeout_ms=timeout_ms, readonly=readonly)
    print("[ok]   drive montado com sucesso.")
    return mountpoint

def safe_chdir(path):
    """usa os.chdir com validações, evitando %cd"""
    if not os.path.exists(path):
        raise FileNotFoundError(f"caminho não existe: {path}")
    os.chdir(path)
    print("[ok]   diretório atual:", os.getcwd())

#parâmetros do projeto -----
GITHUB_OWNER = "LeoBR84p"
GITHUB_REPO  = "ae-tabular"
CLEAN_URL    = f"https://github.com/{GITHUB_OWNER}/{GITHUB_REPO}.git"

#montar/remontar o google drive (robusto)
MOUNTPOINT = safe_mount_google_drive("/content/drive")
BASE = f"{MOUNTPOINT}/MyDrive/Notebooks"  #ajuste se quiser
REPO = "ae-tabular"
PROJ = f"{BASE}/{REPO}"
os.makedirs(BASE, exist_ok=True)

#venv fora do drive (mais rápido e evita sync)
VENV_PATH = "/content/.venv_data"
VENV_BIN  = f"{VENV_PATH}/bin"
VENV_PY   = f"{VENV_BIN}/python"
VENV_PIP  = f"{VENV_BIN}/pip"   #pode não existir ainda se o venv foi criado sem pip

#criação do venv com fallback para 'virtualenv'
def create_or_repair_venv(venv_path: str, venv_python: str):
    if not os.path.exists(VENV_BIN):
        #print(f"[info] criando venv (stdlib) em {venv_path} --without-pip ...")
        try:
            run([sys.executable, "-m", "venv", "--without-pip", venv_path], check=True)
            print("[ok]   venv criado (sem pip).")
        except Exception as e:
            print(f"[aviso] venv(stdlib) falhou: {e}")
            #print("[info] instalando 'virtualenv' e criando venv alternativo com pip embutido...")
            run([sys.executable, "-m", "pip", "install", "-q", "--upgrade", "virtualenv"], check=True)
            run([sys.executable, "-m", "virtualenv", "--python", sys.executable, venv_path], check=True)
            print("[ok]   venv criado via virtualenv.")
    else:
        print(f"[ok]   venv já existe em {venv_path}")

create_or_repair_venv(VENV_PATH, VENV_PY)

#ajusta PATH antes de qualquer instalação
os.environ["PATH"] = f"{VENV_BIN}{os.pathsep}{os.environ['PATH']}"
os.environ["VIRTUAL_ENV"] = VENV_PATH
print("[ok]   venv adicionado ao PATH")

#garante pip dentro do venv (ensurepip -> fallback virtualenv)
def _ensure_pip_in_venv(vpy: str):
    try:
        run([vpy, "-m", "pip", "--version"], check=True)
        return True
    except Exception:
        #print("[info] pip ausente no venv. tentando ensurepip dentro do venv...")
        try:
            run([vpy, "-m", "ensurepip", "--upgrade", "--default-pip"], check=True)
            run([vpy, "-m", "pip", "install", "-q", "--upgrade", "pip", "setuptools", "wheel"], check=True)
            return True
        except Exception as e:
            #print(f"[aviso] ensurepip no venv falhou: {e}")
            #print("[info] fallback: usando virtualenv para semear o pip dentro do venv existente...")
            run([sys.executable, "-m", "pip", "install", "-q", "--upgrade", "virtualenv"], check=True)
            run([sys.executable, "-m", "virtualenv", "--python", vpy, VENV_PATH], check=True)
            run([vpy, "-m", "pip", "install", "-q", "--upgrade", "pip", "setuptools", "wheel"], check=True)
            return True

if not _ensure_pip_in_venv(VENV_PY):
    raise RuntimeError("não foi possível provisionar o pip dentro do venv")

# garante que os pacotes instalados no venv sejam visíveis para este kernel
_ver = subprocess.check_output([VENV_PY, "-c", "import sys; print(f'{sys.version_info.major}.{sys.version_info.minor}')"], text=True).strip()
_site_dir = f"{VENV_PATH}/lib/python{_ver}/site-packages"
if _site_dir not in sys.path:
    sys.path.insert(0, _site_dir)
print("[ok]   site-packages do venv adicionado ao sys.path:", _site_dir)

#instala dependências de sessão DENTRO do venv
print("[info] instalando pacotes no venv...")
run([VENV_PY, "-m", "pip", "install", "-q", "jupytext", "nbdime", "nbstripout"])

#habilita integração do nbdime com git (global)
print("[info] habilitando nbdime em git config --global ...")
run([VENV_PY, "-m", "nbdime", "config-git", "--enable", "--global"])

#checks do repositório git + navegação até a pasta do projeto
if not os.path.exists(PROJ):
    print(f"[aviso] pasta do projeto não encontrada em {PROJ}.")
else:
    print("[ok]   pasta do projeto encontrada.")
    safe_chdir(PROJ)
    if not os.path.isdir(".git"):
        print("[aviso] esta pasta não parece ser um repositório Git (.git ausente).")
    else:
        print("[ok]   repositório Git detectado.")

# resumo do ambiente (confirmação objetiva e detalhada)
kernel_py = sys.executable
venv_py = VENV_PY
site_dir = _site_dir

# verifica se o site-packages do venv está no sys.path
site_ok = site_dir in sys.path

# obtém versões e caminhos
try:
    py_ver = subprocess.check_output([venv_py, "-V"], text=True).strip()
    pip_ver = subprocess.check_output([venv_py, "-m", "pip", "--version"], text=True).strip()
    pip_path = subprocess.check_output(
        [venv_py, "-m", "pip", "show", "pip"], text=True, stderr=subprocess.DEVNULL
    )
    pip_path_line = next((l for l in pip_path.splitlines() if l.startswith("Location:")), "")
except subprocess.CalledProcessError:
    py_ver, pip_ver, pip_path_line = "erro", "erro", ""

# imprime status linha a linha
print(f"[ok]   venv habilitado" if venv_py else "[erro] venv não encontrado")
print(f"[info] python em uso: {kernel_py}")
print(f"[info] versão do python: {py_ver}")
print(f"[ok]   pip do venv ativo" if "pip" in pip_ver.lower() else "[erro] pip do venv não detectado")
print(f"[info] caminho do pip: {venv_py.replace('python','pip')}")
print(f"[ok]   site-packages no sys.path: {site_dir}" if site_ok else f"[erro] site-packages ausente no sys.path: {site_dir}")
print(f"[info] versão do pip: {pip_ver}")

#all BS below
#mensagem com humor (skynet)
from IPython.display import display, HTML
display(HTML('<div style="margin:12px 0;padding:8px 12px;border:1px dashed #999;">'
             '<b>🤖 Skynet</b>: T-800 ativado. Diagnóstico do ambiente concluído. '
             '🎯 Alvo principal: organização do notebook e venv fora do drive.'
             '</div>'))

In [None]:
# @title
#ID003
# imports principais
from pathlib import Path
from datetime import datetime
from zoneinfo import ZoneInfo
import json, os, getpass, platform, subprocess, sys

# configurações do projeto (já fornecidas)
author_name    = "Leandro Bernardo Rodrigues"
owner          = "LeoBR84p"         # dono do repositório no GitHub
repo_name      = "ae-tabular"       # nome do repositório
default_branch = "main"
repo_dir       = Path("/content/drive/MyDrive/Notebooks/ae-tabular")
remote_base    = f"https://github.com/{owner}/{repo_name}.git"
author_email   = f"bernardo.leandro@gmail.com"  # evita erro de identidade

# zona de tempo brasilia
TZ_BR = ZoneInfo("America/Sao_Paulo")

def skynet(msg: str):
    """log simples padronizado"""
    ts = datetime.now(TZ_BR).strftime("%Y-%m-%d %H:%M:%S")
    print(f"[skynet {ts}] {msg}")

# 1) montar o google drive se necessário
try:
    from google.colab import drive as _colab_drive  # type: ignore
    if not os.path.ismount("/content/drive"):
        skynet("montando google drive…")
        _colab_drive.mount("/content/drive")
    else:
        skynet("google drive já montado")
except Exception:
    skynet("ambiente não é colab ou google drive indisponível, prosseguindo assim mesmo")

# 2) criar estrutura de pastas do projeto
INPUT_DIR  = repo_dir / "input"
OUTPUT_DIR = repo_dir / "output"

os.makedirs(INPUT_DIR, exist_ok=True)
os.makedirs(OUTPUT_DIR, exist_ok=True)

# 3) criar subpasta de execução em output com carimbo de data e hora de brasília
run_stamp = datetime.now(TZ_BR).strftime("%Y-%m-%d_%H-%M-%S")
RUN_DIR = OUTPUT_DIR / run_stamp
os.makedirs(RUN_DIR, exist_ok=True)

# 4) opcional: escrever metadados mínimos da execução
run_meta = {
    "author_name": author_name,
    "author_email": author_email,
    "project": repo_name,
    "run_stamp_br": run_stamp,
    "timezone": "America/Sao_Paulo",
    "python_version": sys.version.split()[0],
    "platform": platform.platform(),
    "uid": getpass.getuser(),
}
with open(RUN_DIR / "run.json", "w", encoding="utf-8") as f:
    json.dump(run_meta, f, ensure_ascii=False, indent=2)

# 5) git opcional: configurar identidade local se este for um repositório git
def _git(cmd, cwd=None):
    return subprocess.run(["git", *cmd], cwd=cwd, text=True, capture_output=True)

if (repo_dir / ".git").exists():
    _git(["config", "user.name", author_name], cwd=repo_dir)
    _git(["config", "user.email", author_email], cwd=repo_dir)
    skynet("git detectado e identidade configurada no repositório")
else:
    skynet("repositório git não detectado em repo_dir, etapa git será ignorada por enquanto")

# 6) impressão de caminhos úteis
skynet("ambiente preparado com sucesso")
print("repo_dir:   ", repo_dir)
print("input_dir:  ", INPUT_DIR)
print("output_dir: ", OUTPUT_DIR)
print("run_dir:    ", RUN_DIR)


## **Etapa 2:** Ingestão de dados

In [None]:
# @title
# parágrafo 2: ingestão de journal entries (csv utf-8 bom ;), seleção por índice e simulação 50k

# imports
import os, sys, json, math, random
from pathlib import Path
from datetime import datetime
from zoneinfo import ZoneInfo
import pandas as pd
import numpy as np

# dependências do parágrafo 1
assert 'RUN_DIR' in globals() and 'repo_dir' in globals(), "execute o parágrafo 1 antes."
assert 'INPUT_DIR' in globals() and 'OUTPUT_DIR' in globals(), "execute o parágrafo 1 antes."
assert 'TZ_BR' in globals() and callable(skynet), "execute o parágrafo 1 antes."

# parâmetros de leitura obrigatórios
CSV_ENCODING = "utf-8-sig"   # utf-8 com bom
CSV_SEP      = ";"           # separador ponto e vírgula

# colunas esperadas
REQUIRED_COLS = [
    "username",        # nome do usuário
    "lotacao",         # unidade/lotação
    "tipoconta",       # Ativo, Passivo, Resultado, Receita, Despesa, Outras
    "valormi",         # valor em reais com 2 casas decimais
    "dc",              # 'd' para débito, 'c' para crédito
    "contacontabil",   # conta contábil COSIF (número)
]

# categorias válidas
TIPOCONTA_VALIDAS = {"Ativo","Passivo","Resultado","Receita","Despesa","Outras"}
DC_VALIDOS        = {"d","c"}

# utilitário: listar csvs no google drive do usuário (mydrive), ordenados por data desc
def listar_csvs_mydrive(max_files=2000):
    raiz = Path("/content/drive/MyDrive")
    arquivos = []
    for p in raiz.rglob("*.csv"):
        try:
            stat = p.stat()
            arquivos.append((p, stat.st_mtime, stat.st_size))
            if len(arquivos) >= max_files:
                break
        except Exception:
            continue
    # ordena por mtime desc
    arquivos.sort(key=lambda x: x[1], reverse=True)
    return arquivos

# utilitário: impressão amigável de bytes
def _fmt_size(n):
    for u in ["B","KB","MB","GB","TB"]:
        if n < 1024:
            return f"{n:.1f}{u}"
        n /= 1024
    return f"{n:.1f}PB"

# geração de dataset simulado (50k linhas) balanceado por partidas dobradas
def gerar_csv_simulado_50k(dest_dir: Path) -> Path:
    # mapeia tipoconta -> prefixo cosif plausível
    cosif_prefix = {
        "Ativo": "1",
        "Passivo": "2",
        "Resultado": "3",
        "Receita": "7",
        "Despesa": "8",
        "Outras": "9",
    }
    # listas de apoio
    lotacoes = [f"UNID-{i:03d}" for i in range(1, 121)]
    usuarios = [f"user{i:04d}" for i in range(1, 3001)]
    tipos    = list(cosif_prefix.keys())
    # número de pares (d,c)
    n_pairs = 25_000  # 50k linhas
    rng = np.random.default_rng(seed=42)

    rows = []
    for _ in range(n_pairs):
        # valor do par, duas casas
        base_val = float(np.round(rng.uniform(10.0, 50_000.0), 2))
        # pequena variação entre d e c para simular centavos e depois ajustar
        d_val = float(np.round(base_val * rng.uniform(0.5, 1.5), 2))
        c_val = d_val  # assegura partida dobrada perfeita

        # escolhe atributos débito
        t_d  = random.choice(tipos)
        lot_d = random.choice(lotacoes)
        usr_d = random.choice(usuarios)
        cc_d  = f"{cosif_prefix[t_d]}{rng.integers(0, 10_000_000):07d}"

        # escolhe atributos crédito
        t_c  = random.choice(tipos)
        lot_c = random.choice(lotacoes)
        usr_c = random.choice(usuarios)
        cc_c  = f"{cosif_prefix[t_c]}{rng.integers(0, 10_000_000):07d}"

        rows.append((usr_d, lot_d, t_d, d_val, "d", cc_d))
        rows.append((usr_c, lot_c, t_c, c_val, "c", cc_c))

    df = pd.DataFrame(rows, columns=REQUIRED_COLS)

    # garante duas casas decimais no export; manteremos float em memória
    stamp = datetime.now(TZ_BR).strftime("%Y-%m-%d_%H-%M-%S")
    dest_dir.mkdir(parents=True, exist_ok=True)
    out_path = dest_dir / f"journal_simulado_{stamp}.csv"

    # exporta com utf-8-sig e separador ';'
    # formata valormi com duas casas decimais
    df_fmt = df.copy()
    df_fmt["valormi"] = df_fmt["valormi"].map(lambda x: f"{x:.2f}")
    df_fmt.to_csv(out_path, index=False, sep=CSV_SEP, encoding=CSV_ENCODING)

    return out_path

# etapa a) gerar um csv simulado em INPUT_DIR para protótipo
sim_path = gerar_csv_simulado_50k(INPUT_DIR)
skynet(f"csv simulado gerado: {sim_path}")

# etapa b) listar csvs do mydrive para seleção por índice
skynet("varrendo csvs em /content/drive/MyDrive (isso pode levar algum tempo em drives grandes)")
csvs = listar_csvs_mydrive(max_files=2000)

# inclui o simulado recém-gerado no topo da lista para facilitar
csvs = [(sim_path, sim_path.stat().st_mtime, sim_path.stat().st_size)] + csvs

# imprime lista
print("\narquivos csv encontrados (índice, tamanho, caminho):")
for i, (p, mtime, sz) in enumerate(csvs):
    print(f"[{i:03d}] {_fmt_size(sz):>8}  {str(p)}")

# seleção por índice
while True:
    try:
        sel = input("\ndigite o índice do csv desejado e pressione enter: ").strip()
        idx = int(sel)
        assert 0 <= idx < len(csvs)
        SELECTED_CSV = Path(csvs[idx][0])
        break
    except Exception as e:
        print(f"índice inválido. tente novamente. erro: {e}")

skynet(f"arquivo selecionado: {SELECTED_CSV}")

# etapa c) validar formato (encoding/delimitador/colunas) e carregar dataframe
def carregar_validar_csv(path: Path) -> pd.DataFrame:
    # leitura com requisitos obrigatórios
    df = pd.read_csv(path, sep=CSV_SEP, encoding=CSV_ENCODING, dtype=str)
    # remove espaços nas colunas e valores
    df.columns = [c.strip() for c in df.columns]
    df = df.applymap(lambda x: x.strip() if isinstance(x, str) else x)

    # checa colunas requeridas
    faltantes = [c for c in REQUIRED_COLS if c not in df.columns]
    if faltantes:
        raise ValueError(f"colunas ausentes: {faltantes}. esperado: {REQUIRED_COLS}")

    # tipagem: valormi -> float com duas casas (ponto decimal)
    # se vier com vírgula decimal por engano, converte
    def _parse_val(v):
        v = (v or "").replace(".", "").replace(",", ".") if isinstance(v, str) and v.count(",")==1 and v.count(".")>1 else v
        v = (v or "").replace(",", ".") if isinstance(v, str) else v
        try:
            return round(float(v), 2)
        except Exception:
            return np.nan

    df["valormi"] = df["valormi"].apply(_parse_val).astype(float)

    # normaliza dc
    df["dc"] = df["dc"].str.lower()
    # normaliza tipoconta (capitalização 1ª maiúscula)
    df["tipoconta"] = df["tipoconta"].str.capitalize()

    # validações de conteúdo
    problemas = {}
    if not set(REQUIRED_COLS).issubset(df.columns):
        problemas["colunas_invalidas"] = list(set(REQUIRED_COLS) - set(df.columns))
    if (~df["dc"].isin(DC_VALIDOS)).any():
        problemas["dc_invalido"] = int((~df["dc"].isin(DC_VALIDOS)).sum())
    if (~df["tipoconta"].isin(TIPOCONTA_VALIDAS)).any():
        problemas["tipoconta_invalida"] = int((~df["tipoconta"].isin(TIPOCONTA_VALIDAS)).sum())
    if df["valormi"].isna().any():
        problemas["valormi_na"] = int(df["valormi"].isna().sum())
    if (df["valormi"] < 0).any():
        problemas["valormi_negativo"] = int((df["valormi"] < 0).sum())

    # contacontabil: manter como string numérica
    df["contacontabil"] = df["contacontabil"].astype(str)
    if (~df["contacontabil"].str.fullmatch(r"\d+")).any():
        problemas["contacontabil_nao_numerica"] = int((~df["contacontabil"].str.fullmatch(r"\d+")).sum())

    if problemas:
        report_path = RUN_DIR / "validacao_ingestao.json"
        with open(report_path, "w", encoding="utf-8") as f:
            json.dump(problemas, f, ensure_ascii=False, indent=2)
        raise ValueError(f"falhas de validação encontradas. veja {report_path}")

    return df

DF_RAW = carregar_validar_csv(SELECTED_CSV)
skynet(f"csv carregado com sucesso: {len(DF_RAW):,} linhas")

# etapa d) persistir cópia da fonte e snapshot parquet no run_dir
# cópia padronizada do csv selecionado para rastreabilidade
src_copy = RUN_DIR / f"selected_source.csv"
if SELECTED_CSV.resolve() != src_copy.resolve():
    try:
        # reexporta com formatação padronizada garantida (utf-8-sig ; e valormi com 2 casas)
        df_exp = DF_RAW.copy()
        df_exp["valormi"] = df_exp["valormi"].map(lambda x: f"{x:.2f}")
        df_exp.to_csv(src_copy, index=False, sep=CSV_SEP, encoding=CSV_ENCODING)
        skynet(f"fonte padronizada salva em {src_copy}")
    except Exception as e:
        skynet(f"aviso: não foi possível salvar cópia padronizada do csv ({e})")

# salva snapshot parquet para processamento rápido nas próximas fases
snap_path = RUN_DIR / "journal_entries.parquet"
DF_RAW.to_parquet(snap_path, index=False)
skynet(f"snapshot parquet salvo: {snap_path}")

# imprime um resumo inicial
print("\nvisão geral:")
print(DF_RAW.head(5))
print("\ncontagem por dc:\n", DF_RAW["dc"].value_counts(dropna=False))
print("\ncontagem por tipoconta:\n", DF_RAW["tipoconta"].value_counts(dropna=False))

## **Etapa 3:** Limpeza, configurações e split (train/val)

In [None]:
# parágrafo 3: limpeza, engenharia de features e split train/val

import os, json, math, re, pickle, random
from pathlib import Path
import numpy as np
import pandas as pd

# dependências do parágrafo 1 e 2
assert 'RUN_DIR' in globals() and 'snap_path' in globals() and 'skynet' in globals(), "execute os parágrafos 1 e 2 antes."
assert Path(snap_path).exists(), "snapshot parquet ausente. finalize o parágrafo 2."

# importa dados brutos do snapshot
DF = pd.read_parquet(snap_path)

# 1) limpeza básica
# remove espaços extras novamente e normaliza casos (segurança)
for c in DF.columns:
    if DF[c].dtype == object:
        DF[c] = DF[c].astype(str).str.strip()

DF["dc"] = DF["dc"].str.lower().map({"d": "d", "c": "c"})  # garante domínio
DF["tipoconta"] = DF["tipoconta"].str.capitalize()

# garante numérico valormi (já veio como float do parágrafo 2)
DF["valormi"] = DF["valormi"].astype(float)

# remove linhas com falhas críticas
mask_ok = (
    DF["username"].notna() &
    DF["lotacao"].notna() &
    DF["tipoconta"].notna() &
    DF["dc"].isin({"d","c"}) &
    DF["contacontabil"].str.fullmatch(r"\d+") &
    DF["valormi"].notna() & (DF["valormi"] >= 0)
)
removed = (~mask_ok).sum()
DF = DF.loc[mask_ok].reset_index(drop=True)
skynet(f"linhas removidas por validação adicional: {removed}")

# 2) features auxiliares
# 2.1) decomposições COSIF (prefixos para hierarquia)
DF["cosif_len"] = DF["contacontabil"].str.len().clip(upper=12).astype(int)
DF["cosif_p1"]  = DF["contacontabil"].str[0]          # 1 dígito
DF["cosif_p2"]  = DF["contacontabil"].str[:2]         # 2 dígitos
DF["cosif_p3"]  = DF["contacontabil"].str[:3]         # 3 dígitos

# 2.2) transformações em valor
DF["valormi_log1p"] = np.log1p(DF["valormi"])
# (opcional) normalizações por grupo virão como z-scores abaixo

# 2.3) frequency encoding para categorias grandes
def freq_encode(series: pd.Series) -> pd.Series:
    freq = series.value_counts(dropna=False)
    return series.map(freq).astype(float)

DF["freq_username"]      = freq_encode(DF["username"])
DF["freq_lotacao"]       = freq_encode(DF["lotacao"])
DF["freq_contacontabil"] = freq_encode(DF["contacontabil"])
DF["freq_cosif_p2"]      = freq_encode(DF["cosif_p2"])
DF["freq_cosif_p3"]      = freq_encode(DF["cosif_p3"])

# 2.4) estatísticas por grupo (média, desvio e z-score de valormi)
def add_group_stats(df: pd.DataFrame, key: str, val_col: str = "valormi"):
    g = df.groupby(key)[val_col].agg(["mean","std","median"]).rename(
        columns={"mean":f"{key}_mean", "std":f"{key}_std", "median":f"{key}_median"}
    )
    df = df.join(g, on=key)
    # evita divisão por zero
    df[f"{key}_std"] = df[f"{key}_std"].replace(0, np.nan)
    df[f"{key}_z"]   = (df[val_col] - df[f"{key}_mean"]) / df[f"{key}_std"]
    df[f"{key}_mad"] = (df[val_col] - df[f"{key}_median"]).abs()
    return df

for k in ["username", "lotacao", "contacontabil", "cosif_p2", "cosif_p3"]:
    DF = add_group_stats(DF, k, "valormi")

# 2.5) codificação de baixo cardinalidade
# dc: binária (d=1, c=0); tipoconta: one-hot
DF["dc_bin"] = DF["dc"].map({"d":1, "c":0}).astype(int)
tipos = sorted(DF["tipoconta"].dropna().unique().tolist())
tipodummies = pd.get_dummies(DF["tipoconta"], prefix="tipo", dtype=int)
DF = pd.concat([DF, tipodummies], axis=1)

# 3) matriz de features final
num_cols = [
    "valormi", "valormi_log1p",
    "freq_username", "freq_lotacao", "freq_contacontabil", "freq_cosif_p2", "freq_cosif_p3",
    "cosif_len",
    "username_mean","username_std","username_z","username_median","username_mad",
    "lotacao_mean","lotacao_std","lotacao_z","lotacao_median","lotacao_mad",
    "contacontabil_mean","contacontabil_std","contacontabil_z","contacontabil_median","contacontabil_mad",
    "cosif_p2_mean","cosif_p2_std","cosif_p2_z","cosif_p2_median","cosif_p2_mad",
    "cosif_p3_mean","cosif_p3_std","cosif_p3_z","cosif_p3_median","cosif_p3_mad",
    "dc_bin",
]
oh_cols = tipodummies.columns.tolist()

# garante existência de todas as colunas numericas (caso algum grupo não crie std etc.)
for c in num_cols:
    if c not in DF.columns:
        DF[c] = np.nan

FEATURE_COLS = num_cols + oh_cols
DF_FE = DF[FEATURE_COLS].copy()

# 4) imputação simples e padronização
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler

imputer = SimpleImputer(strategy="median")
scaler  = StandardScaler()

X_imp = imputer.fit_transform(DF_FE.values)
X_std = scaler.fit_transform(X_imp)

# 5) split não supervisionado (aleatório reprodutível)
from sklearn.model_selection import train_test_split
X_train, X_val = train_test_split(X_std, test_size=0.2, random_state=42, shuffle=True)

skynet(f"features geradas: {DF_FE.shape[1]} colunas; amostras: {DF_FE.shape[0]:,}")
skynet(f"split realizado: train={X_train.shape[0]:,} | val={X_val.shape[0]:,}")

# 6) persistência de artefatos para as próximas fases
artifacts = {
    "feature_cols": FEATURE_COLS,
    "tipos": tipos,
    "imputer": imputer,
    "scaler": scaler,
}
with open(RUN_DIR / "features.pkl", "wb") as f:
    pickle.dump(artifacts, f)

np.savez(RUN_DIR / "dataset_npz.npz", X_train=X_train, X_val=X_val)

# 7) diagnóstico rápido
summary = {
    "n_rows": int(DF_FE.shape[0]),
    "n_features": int(DF_FE.shape[1]),
    "train_rows": int(X_train.shape[0]),
    "val_rows": int(X_val.shape[0]),
    "one_hot_tipoconta_cols": oh_cols,
}
with open(RUN_DIR / "features_summary.json", "w", encoding="utf-8") as f:
    json.dump(summary, f, ensure_ascii=False, indent=2)

print("amostra das primeiras 5 linhas de features padronizadas (numpy):")
print(X_std[:5])

skynet(f"artefatos salvos em {RUN_DIR}: features.pkl e dataset_npz.npz")

# complemento do parágrafo 3: estatísticas por grupo e total de linhas removidas

# imprime quantas linhas foram removidas por falhas críticas na validação
print(f"\nlinhas removidas por falhas críticas (parágrafo 3): {removed}")

# utilitário para exibir dataframes no colab/notebook
try:
    from IPython.display import display
except Exception:
    display = print  # fallback simples

# função de resumo por grupo e identificação de outliers via |z|
def resumo_por_grupo(df: pd.DataFrame, key: str, top_n: int = 10):
    # resumo estatístico por grupo (valormi)
    agg = (
        df.groupby(key)["valormi"]
          .agg(n="size", media="mean", desvio="std", mediana="median", minimo="min", maximo="max")
          .sort_values("n", ascending=False)
    )

    print(f"\n=== resumo por grupo: {key} (top {top_n} por contagem) ===")
    display(agg.head(top_n))

    # salva o resumo completo em csv (utf-8-sig ; )
    resumo_path = RUN_DIR / f"resumo_{key}.csv"
    agg.to_csv(resumo_path, sep=";", encoding="utf-8-sig")
    skynet(f"resumo por grupo '{key}' salvo em {resumo_path}")

    # top outliers por |z| no grupo, usando as colunas já criadas no parágrafo 3
    zcol = f"{key}_z"
    mean_col = f"{key}_mean"
    std_col  = f"{key}_std"

    if zcol in df.columns and mean_col in df.columns and std_col in df.columns:
        out = (
            df[[key, "valormi", zcol, mean_col, std_col]]
            .dropna(subset=[zcol])
            .assign(abs_z=lambda x: x[zcol].abs())
            .sort_values("abs_z", ascending=False)
            .head(top_n)
        )
        print(f"\n--- top {top_n} potenciais outliers por |z| em {key} ---")
        display(out)

        out_path = RUN_DIR / f"outliers_{key}.csv"
        out.to_csv(out_path, sep=";", encoding="utf-8-sig", index=False)
        skynet(f"outliers por |z| '{key}' salvos em {out_path}")
    else:
        print(f"(aviso) colunas de z-score não encontradas para '{key}' — verifique a etapa de features.")

# executa para os principais agrupamentos contábeis
for k in ["username", "lotacao", "contacontabil", "cosif_p2", "cosif_p3"]:
    resumo_por_grupo(DF, k, top_n=10)

# **Etapa 4:** Autoencoder utilizando treino com early-stopping

In [None]:
# @title
# parágrafo 4: autoencoder denso (pytorch) com early-stopping e artefatos

# imports
import os, json, math, time, random, pickle
from pathlib import Path
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

# dependências anteriores
assert 'RUN_DIR' in globals() and 'skynet' in globals(), "execute os parágrafos anteriores."
ds_path = RUN_DIR / "dataset_npz.npz"
assert ds_path.exists(), "dataset_npz.npz ausente — finalize o parágrafo 3."
with np.load(ds_path) as npz:
    X_train = npz["X_train"].astype(np.float32)
    X_val   = npz["X_val"].astype(np.float32)

# tenta importar torch
try:
    import torch
    from torch import nn
    from torch.utils.data import TensorDataset, DataLoader
except Exception as e:
    raise RuntimeError("pytorch não disponível. instale no colab com: pip install torch --quiet") from e

# dispositivo
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
skynet(f"treino em dispositivo: {device}")

# reprodutibilidade
SEED = 42
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
if torch.cuda.is_available():
    torch.cuda.manual_seed_all(SEED)

# dataset e dataloaders
BATCH_SIZE = 1024
train_ds = TensorDataset(torch.from_numpy(X_train), torch.from_numpy(X_train))
val_ds   = TensorDataset(torch.from_numpy(X_val),   torch.from_numpy(X_val))
train_dl = DataLoader(train_ds, batch_size=BATCH_SIZE, shuffle=True,  num_workers=0, pin_memory=torch.cuda.is_available())
val_dl   = DataLoader(val_ds,   batch_size=BATCH_SIZE, shuffle=False, num_workers=0, pin_memory=torch.cuda.is_available())

# configuração do modelo
input_dim = X_train.shape[1]
hidden_dims = [256, 128, 64]  # você pode ajustar
dropout_p = 0.05

class AutoEncoder(nn.Module):
    def __init__(self, in_dim, h_dims, dropout=0.0):
        super().__init__()
        enc = []
        last = in_dim
        for h in h_dims:
            enc += [nn.Linear(last, h), nn.ReLU(), nn.Dropout(dropout)]
            last = h
        self.encoder = nn.Sequential(*enc)

        dec = []
        rev = list(reversed(h_dims))
        last = rev[0]
        for h in rev[1:]:
            dec += [nn.Linear(last, h), nn.ReLU(), nn.Dropout(dropout)]
            last = h
        dec += [nn.Linear(last, in_dim)]  # camada de saída linear
        self.decoder = nn.Sequential(*dec)

    def forward(self, x):
        z = self.encoder(x)
        y = self.decoder(z)
        return y

model = AutoEncoder(input_dim, hidden_dims, dropout=dropout_p).to(device)
skynet(f"modelo criado: input_dim={input_dim}, hidden={hidden_dims}, dropout={dropout_p}")

# otimizador e perda
LR = 1e-3
optimizer = torch.optim.Adam(model.parameters(), lr=LR)
criterion = nn.MSELoss(reduction="mean")

# early-stopping
MAX_EPOCHS = 200
PATIENCE   = 15
MIN_DELTA  = 1e-5

history = {"epoch": [], "train_loss": [], "val_loss": []}
best_val = float("inf")
best_epoch = -1
no_improve = 0
best_path = RUN_DIR / "ae.pt"

def epoch_pass(dataloader, train: bool):
    if train:
        model.train()
    else:
        model.eval()
    total_loss = 0.0
    count = 0
    with torch.set_grad_enabled(train):
        for xb, yb in dataloader:
            xb = xb.to(device, non_blocking=True)
            yb = yb.to(device, non_blocking=True)
            if train:
                optimizer.zero_grad(set_to_none=True)
            yhat = model(xb)
            loss = criterion(yhat, yb)
            if train:
                loss.backward()
                optimizer.step()
            total_loss += loss.item() * xb.size(0)
            count += xb.size(0)
    return total_loss / max(count, 1)

skynet("iniciando treino do autoencoder")
t0 = time.time()
for epoch in range(1, MAX_EPOCHS + 1):
    tr_loss = epoch_pass(train_dl, train=True)
    vl_loss = epoch_pass(val_dl, train=False)

    history["epoch"].append(epoch)
    history["train_loss"].append(tr_loss)
    history["val_loss"].append(vl_loss)

    # logs ocasionais
    if epoch == 1 or epoch % 5 == 0:
        skynet(f"epoch {epoch:03d}  train={tr_loss:.6f}  val={vl_loss:.6f}")

    # early-stopping
    if vl_loss + MIN_DELTA < best_val:
        best_val = vl_loss
        best_epoch = epoch
        no_improve = 0
        torch.save({"model_state": model.state_dict(),
                    "input_dim": input_dim,
                    "hidden_dims": hidden_dims,
                    "dropout_p": dropout_p}, best_path)
    else:
        no_improve += 1
        if no_improve >= PATIENCE:
            skynet(f"early-stopping em epoch {epoch} (sem melhoria por {PATIENCE} épocas)")
            break

t1 = time.time()
skynet(f"treino finalizado em {(t1 - t0):.1f}s; melhor época={best_epoch} val_loss={best_val:.6f}")
assert best_path.exists(), "modelo não foi salvo — verifique o treino."

# salva histórico
hist_df = pd.DataFrame(history)
hist_csv = RUN_DIR / "training_history.csv"
hist_df.to_csv(hist_csv, index=False, sep=";", encoding="utf-8-sig")
skynet(f"histórico de treino salvo em {hist_csv}")

# gráfico das perdas
plt.figure(figsize=(7,4.2))
plt.plot(hist_df["epoch"], hist_df["train_loss"], label="train")
plt.plot(hist_df["epoch"], hist_df["val_loss"],   label="val")
plt.xlabel("época")
plt.ylabel("mse")
plt.title("autoencoder: curva de perdas")
plt.legend()
plt.grid(True, alpha=0.3)
plot_path = RUN_DIR / "loss_plot.png"
plt.savefig(plot_path, dpi=140, bbox_inches="tight")
plt.close()
skynet(f"gráfico de perdas salvo em {plot_path}")

# carrega melhor estado e calcula erros de reconstrução no conjunto de validação
chk = torch.load(best_path, map_location=device)
model.load_state_dict(chk["model_state"])
model.eval()

with torch.no_grad():
    val_tensor = torch.from_numpy(X_val).to(device)
    recon_val  = model(val_tensor).cpu().numpy()
val_err = np.mean((recon_val - X_val) ** 2, axis=1).astype(np.float32)

# salva distribuição de erros e percentis para apoio a threshold
err_path = RUN_DIR / "reconstruction_errors_val.npy"
np.save(err_path, val_err)

percentis = [50, 75, 90, 95, 97, 99, 99.5, 99.9]
thr = {f"p{p}": float(np.percentile(val_err, p)) for p in percentis}
thr["mean"] = float(np.mean(val_err))
thr["std"]  = float(np.std(val_err))
thr["suggested_threshold"] = thr["p99.5"]  # sugestão inicial (ajuste conforme sua capacidade operacional)

thr_path = RUN_DIR / "thresholds.json"
with open(thr_path, "w", encoding="utf-8") as f:
    json.dump(thr, f, ensure_ascii=False, indent=2)

skynet(f"erros de validação salvos em {err_path}")
skynet(f"thresholds salvos em {thr_path}")
print("\nresumo thresholds (val):")
print(pd.Series(thr))

# salva configuração do modelo
cfg = {
    "input_dim": input_dim,
    "hidden_dims": hidden_dims,
    "dropout_p": dropout_p,
    "batch_size": BATCH_SIZE,
    "lr": LR,
    "max_epochs": MAX_EPOCHS,
    "patience": PATIENCE,
    "min_delta": MIN_DELTA,
    "best_epoch": best_epoch,
    "best_val_loss": best_val,
    "device": str(device),
    "seed": SEED,
}
with open(RUN_DIR / "model_config.json", "w", encoding="utf-8") as f:
    json.dump(cfg, f, ensure_ascii=False, indent=2)
skynet("artefatos do modelo escritos em disco: ae.pt, training_history.csv, loss_plot.png, thresholds.json, model_config.json")

## **Etapa 6:** Calibração e metas

In [None]:
# parágrafo 6: calibração do limiar (threshold) e relatórios de custo / alertas

import numpy as np, pandas as pd, matplotlib.pyplot as plt, json, math
from pathlib import Path

# dependências
assert 'RUN_DIR' in globals() and 'skynet' in globals(), "execute os parágrafos anteriores."
thr_path = RUN_DIR / "thresholds.json"
err_path = RUN_DIR / "reconstruction_errors_val.npy"
assert thr_path.exists() and err_path.exists(), "execute o parágrafo 4 antes."

# carrega erros de reconstrução do conjunto de validação
val_err = np.load(err_path)
with open(thr_path, encoding="utf-8") as f:
    thr_info = json.load(f)

# parâmetros operacionais
CUSTO_FP = 1.0    # custo unitário de um falso positivo (alerta desnecessário)
CUSTO_FN = 25.0   # custo unitário de um falso negativo (fraude/erro não detectado)
ALERTAS_META = 200  # meta operacional de alertas/dia

# função: calcular custo e quantidade de alertas para cada limiar
def calcular_metricas(errs: np.ndarray, limiares: np.ndarray):
    n = len(errs)
    # ordena erros
    errs_sorted = np.sort(errs)
    metrics = []
    for thr in limiares:
        alertas = np.count_nonzero(errs > thr)
        fp_rate = alertas / n
        fn_rate = 1 - fp_rate
        custo = fp_rate * CUSTO_FP + fn_rate * CUSTO_FN
        metrics.append((thr, alertas, fp_rate, fn_rate, custo))
    df = pd.DataFrame(metrics, columns=["threshold", "alertas", "fp_rate", "fn_rate", "custo"])
    return df

# grid de limiares entre p50 e p99.9
low, high = np.percentile(val_err, [50, 99.9])
limiares = np.linspace(low, high, 200)
df_calib = calcular_metricas(val_err, limiares)

# acha o limiar que gera ~ALERTAS_META alertas
target_idx = (df_calib["alertas"] - ALERTAS_META).abs().idxmin()
limiar_meta = float(df_calib.loc[target_idx, "threshold"])

# curva de custo
plt.figure(figsize=(6.8, 4))
plt.plot(df_calib["threshold"], df_calib["custo"], label="custo total")
plt.axvline(limiar_meta, color="r", ls="--", label=f"meta {ALERTAS_META} alertas")
plt.xlabel("threshold")
plt.ylabel("custo relativo")
plt.title("Curva de custo vs. threshold")
plt.legend()
plt.grid(alpha=0.3)
plt.tight_layout()
cost_path = RUN_DIR / "cost_curve.png"
plt.savefig(cost_path, dpi=140)
plt.close()

# curva de alertas
plt.figure(figsize=(6.8, 4))
plt.plot(df_calib["threshold"], df_calib["alertas"])
plt.axhline(ALERTAS_META, color="r", ls="--", label="meta operacional")
plt.xlabel("threshold")
plt.ylabel("quantidade de alertas")
plt.title("Alertas vs. threshold")
plt.legend()
plt.grid(alpha=0.3)
plt.tight_layout()
alerts_path = RUN_DIR / "alerts_vs_threshold.png"
plt.savefig(alerts_path, dpi=140)
plt.close()

# salva relatório completo
calib_csv = RUN_DIR / "calibration_report.csv"
df_calib.to_csv(calib_csv, sep=";", encoding="utf-8-sig", index=False)

# resumo das métricas principais
summary = {
    "threshold_meta": limiar_meta,
    "alertas_meta": int(df_calib.loc[target_idx, "alertas"]),
    "custo_meta": float(df_calib.loc[target_idx, "custo"]),
    "custo_minimo": float(df_calib["custo"].min()),
    "threshold_custo_minimo": float(df_calib.loc[df_calib["custo"].idxmin(), "threshold"]),
}
with open(RUN_DIR / "calibration_summary.json", "w", encoding="utf-8") as f:
    json.dump(summary, f, ensure_ascii=False, indent=2)

skynet(f"calibração concluída. limiar_meta={limiar_meta:.6f} → {summary['alertas_meta']} alertas estimados")
skynet(f"relatórios salvos em: {calib_csv}, {cost_path}, {alerts_path}")
print(pd.Series(summary))

# complemento do parágrafo 6: marcar pontos escolhidos e exibir/salvar figuras

# recupera valores nos pontos de interesse
idx_meta = target_idx
thr_meta = float(df_calib.loc[idx_meta, "threshold"])
custo_meta = float(df_calib.loc[idx_meta, "custo"])
alertas_meta_calc = int(df_calib.loc[idx_meta, "alertas"])

idx_cmin = int(df_calib["custo"].idxmin())
thr_cmin = float(df_calib.loc[idx_cmin, "threshold"])
custo_min = float(df_calib.loc[idx_cmin, "custo"])
alertas_cmin = int(df_calib.loc[idx_cmin, "alertas"])

# 1) curva de custo com marcações
plt.figure(figsize=(7.2, 4.5))
plt.plot(df_calib["threshold"], df_calib["custo"], label="custo total")
plt.axvline(thr_meta, linestyle="--", label=f"threshold_meta = {thr_meta:.6f}")
plt.scatter([thr_meta, thr_cmin], [custo_meta, custo_min], s=60, label="pontos escolhidos")
plt.annotate(f"meta\nc={custo_meta:.2f}", xy=(thr_meta, custo_meta),
             xytext=(10, 10), textcoords="offset points")
plt.annotate(f"mínimo\nc={custo_min:.2f}", xy=(thr_cmin, custo_min),
             xytext=(10, -15), textcoords="offset points")
plt.xlabel("threshold")
plt.ylabel("custo relativo")
plt.title("Curva de custo vs. threshold (com marcações)")
plt.legend()
plt.grid(alpha=0.3)
plt.tight_layout()
cost_marked_path = RUN_DIR / "cost_curve_marked.png"
plt.savefig(cost_marked_path, dpi=140)
plt.show()
skynet(f"curva de custo (marcada) salva em {cost_marked_path}")

# 2) curva de alertas com marcações
plt.figure(figsize=(7.2, 4.5))
plt.plot(df_calib["threshold"], df_calib["alertas"], label="alertas")
plt.axhline(ALERTAS_META, linestyle="--", label=f"meta operacional = {ALERTAS_META}")
plt.scatter([thr_meta, thr_cmin], [alertas_meta_calc, alertas_cmin], s=60, label="pontos escolhidos")
plt.annotate(f"meta\nA={alertas_meta_calc}", xy=(thr_meta, alertas_meta_calc),
             xytext=(10, 10), textcoords="offset points")
plt.annotate(f"mínimo(custo)\nA={alertas_cmin}", xy=(thr_cmin, alertas_cmin),
             xytext=(10, -15), textcoords="offset points")
plt.xlabel("threshold")
plt.ylabel("quantidade de alertas")
plt.title("Alertas vs. threshold (com marcações)")
plt.legend()
plt.grid(alpha=0.3)
plt.tight_layout()
alerts_marked_path = RUN_DIR / "alerts_vs_threshold_marked.png"
plt.savefig(alerts_marked_path, dpi=140)
plt.show()
skynet(f"curva de alertas (marcada) salva em {alerts_marked_path}")

## **Etapa 7:** Pontuação em lote e scores

In [None]:
# parágrafo 7: pontuação em lote com autoencoder treinado e export de scores (versão corrigida)

import os, json, math, pickle
from pathlib import Path
from datetime import datetime
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

# dependências anteriores
assert 'RUN_DIR' in globals() and 'skynet' in globals(), "execute os parágrafos anteriores."
assert 'REQUIRED_COLS' in globals() and 'CSV_SEP' in globals() and 'CSV_ENCODING' in globals(), "execute o parágrafo 2 antes."
assert 'carregar_validar_csv' in globals(), "a função de carga/validação do parágrafo 2 é necessária."
artifacts_path = RUN_DIR / "features.pkl"
model_path     = RUN_DIR / "ae.pt"
assert artifacts_path.exists() and model_path.exists(), "artefatos ausentes: finalize os parágrafos 3 e 4."

# carrega artefatos de features e modelo
with open(artifacts_path, "rb") as f:
    feats = pickle.load(f)
FEATURE_COLS = feats["feature_cols"]
imputer      = feats["imputer"]
scaler       = feats["scaler"]

# threshold: usa o da calibração se existir; senão p99.5 do parágrafo 4
thr_use = None
calib_summary = RUN_DIR / "calibration_summary.json"
thr_json      = RUN_DIR / "thresholds.json"
if calib_summary.exists():
    with open(calib_summary, encoding="utf-8") as f:
        thr_use = json.load(f).get("threshold_meta", None)
if thr_use is None and thr_json.exists():
    with open(thr_json, encoding="utf-8") as f:
        tj = json.load(f)
        thr_use = tj.get("suggested_threshold", None)
assert thr_use is not None, "threshold não encontrado; execute o parágrafo 6 (ou use thresholds.json do parágrafo 4)."
thr_use = float(thr_use)

# função: frequency encoding consistente
def _freq_encode(series: pd.Series) -> pd.Series:
    freq = series.value_counts(dropna=False)
    return series.map(freq).astype(float)

# função: estatísticas por grupo (mean/std/median, z, mad)
def _add_group_stats(df: pd.DataFrame, key: str, val_col: str = "valormi"):
    g = df.groupby(key, dropna=False)[val_col].agg(["mean","std","median"]).rename(
        columns={"mean":f"{key}_mean", "std":f"{key}_std", "median":f"{key}_median"}
    )
    df = df.join(g, on=key)
    df[f"{key}_std"] = df[f"{key}_std"].replace(0, np.nan)
    df[f"{key}_z"]   = (df[val_col] - df[f"{key}_mean"]) / df[f"{key}_std"]
    df[f"{key}_mad"] = (df[val_col] - df[f"{key}_median"]).abs()
    return df

# função: construir features exatamente como no parágrafo 3 (com correção .str.strip())
def construir_features(df_raw: pd.DataFrame) -> pd.DataFrame:
    df = df_raw.copy()

    # normalização básica de strings (uso correto de .str.strip())
    for c in df.columns:
        if df[c].dtype == object:
            df[c] = df[c].astype(str).str.strip()

    # domínios padronizados
    df["dc"] = df["dc"].astype(str).str.lower().map({"d":"d","c":"c"})
    df["tipoconta"] = df["tipoconta"].astype(str).str.capitalize()
    # garante tipo numérico para valormi
    df["valormi"] = pd.to_numeric(df["valormi"], errors="coerce").astype(float)

    # decomposições cosif
    df["contacontabil"] = df["contacontabil"].astype(str).str.strip()
    df["cosif_len"] = df["contacontabil"].str.len().clip(upper=12).astype(int)
    df["cosif_p1"]  = df["contacontabil"].str[0]
    df["cosif_p2"]  = df["contacontabil"].str[:2]
    df["cosif_p3"]  = df["contacontabil"].str[:3]

    # transformações de valor
    df["valormi_log1p"] = np.log1p(df["valormi"].clip(lower=0))

    # frequency encoding
    df["freq_username"]      = _freq_encode(df["username"])
    df["freq_lotacao"]       = _freq_encode(df["lotacao"])
    df["freq_contacontabil"] = _freq_encode(df["contacontabil"])
    df["freq_cosif_p2"]      = _freq_encode(df["cosif_p2"])
    df["freq_cosif_p3"]      = _freq_encode(df["cosif_p3"])

    # estatísticas por grupo
    for k in ["username","lotacao","contacontabil","cosif_p2","cosif_p3"]:
        df = _add_group_stats(df, k, "valormi")

    # codificações de baixa cardinalidade
    df["dc_bin"] = df["dc"].map({"d":1,"c":0}).astype("Int64").astype(int)
    tipodummies = pd.get_dummies(df["tipoconta"], prefix="tipo", dtype=int)

    df = pd.concat([df, tipodummies], axis=1)

    # garante presença de todas as colunas esperadas e na mesma ordem do treino
    for c in FEATURE_COLS:
        if c not in df.columns:
            df[c] = np.nan

    df_out = df[FEATURE_COLS].copy()
    return df_out

# carrega modelo pytorch
import torch
from torch import nn

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
chk = torch.load(model_path, map_location=device)

class AutoEncoder(nn.Module):
    def __init__(self, in_dim, h_dims, dropout=0.0):
        super().__init__()
        enc, last = [], in_dim
        for h in h_dims:
            enc += [nn.Linear(last, h), nn.ReLU(), nn.Dropout(dropout)]
            last = h
        self.encoder = nn.Sequential(*enc)
        dec, rev = [], list(reversed(h_dims))
        last = rev[0]
        for h in rev[1:]:
            dec += [nn.Linear(last, h), nn.ReLU(), nn.Dropout(dropout)]
            last = h
        dec += [nn.Linear(last, in_dim)]
        self.decoder = nn.Sequential(*dec)
    def forward(self, x):
        z = self.encoder(x)
        y = self.decoder(z)
        return y

model = AutoEncoder(chk["input_dim"], chk["hidden_dims"], dropout=chk["dropout_p"]).to(device)
model.load_state_dict(chk["model_state"])
model.eval()
skynet(f"modelo carregado para pontuação (device={device})")

# seleção do arquivo a pontuar: reusar o selecionado ou escolher outro
use_same = input("pressione enter para usar o CSV já selecionado; ou digite 'novo' para escolher outro: ").strip().lower()
if use_same == "novo":
    assert 'listar_csvs_mydrive' in globals(), "a função listar_csvs_mydrive do parágrafo 2 é necessária."
    csvs = listar_csvs_mydrive(max_files=2000)
    print("\narquivos csv encontrados (índice, tamanho, caminho):")
    for i, (p, mtime, sz) in enumerate(csvs):
        print(f"[{i:03d}] {sz/1024/1024:6.2f}MB  {str(p)}")
    sel = int(input("\ndigite o índice do csv desejado: ").strip())
    CSV_TO_SCORE = Path(csvs[sel][0])
else:
    assert 'SELECTED_CSV' in globals(), "não há CSV selecionado anteriormente; escolha 'novo'."
    CSV_TO_SCORE = Path(SELECTED_CSV)

skynet(f"pontuando arquivo: {CSV_TO_SCORE}")

# carrega e valida o csv
DF_SCORE_SRC = carregar_validar_csv(CSV_TO_SCORE)

# remove linhas com falhas críticas como no parágrafo 3
mask_ok_score = (
    DF_SCORE_SRC["username"].notna() &
    DF_SCORE_SRC["lotacao"].notna() &
    DF_SCORE_SRC["tipoconta"].notna() &
    DF_SCORE_SRC["dc"].isin({"d","c"}) &
    DF_SCORE_SRC["contacontabil"].astype(str).str.fullmatch(r"\d+") &
    DF_SCORE_SRC["valormi"].notna() & (DF_SCORE_SRC["valormi"] >= 0)
)
removed_score = int((~mask_ok_score).sum())
DF_SCORE = DF_SCORE_SRC.loc[mask_ok_score].reset_index(drop=True)
skynet(f"linhas removidas nesta pontuação: {removed_score}")

# constroi features e aplica imputer/scaler treinados
X_feat = construir_features(DF_SCORE)
X_imp  = imputer.transform(X_feat.values)
X_std  = scaler.transform(X_imp)

# inferência
with torch.no_grad():
    X_tensor = torch.from_numpy(X_std.astype(np.float32)).to(device)
    X_recon  = model(X_tensor).cpu().numpy()

# erro de reconstrução (mse por amostra) e contribuições por feature (erro quadrático)
err_vec = np.mean((X_recon - X_std) ** 2, axis=1).astype(np.float32)
contrib_mat = (X_recon - X_std) ** 2  # mesma escala das features padronizadas
contrib_cols = FEATURE_COLS

# top-k contribuições por amostra (nomes das variáveis)
TOPK = 5
topk_idx = np.argsort(contrib_mat, axis=1)[:, ::-1][:, :TOPK]
topk_names = [[contrib_cols[j] for j in row] for row in topk_idx]

# flag por limiar
flags = (err_vec > thr_use).astype(int)

# compõe dataframe de saída
OUT = DF_SCORE[["username","lotacao","tipoconta","valormi","dc","contacontabil"]].copy()
OUT["recon_error"] = err_vec
OUT["flag_threshold"] = flags
OUT["threshold_used"] = thr_use
for k in range(TOPK):
    OUT[f"top{k+1}"] = [names[k] for names in topk_names]

# salva csv de scores
scores_path = RUN_DIR / "scores.csv"
OUT.to_csv(scores_path, sep=";", encoding="utf-8-sig", index=False)
skynet(f"scores salvos em {scores_path}")

# salva contribuições médias por feature (para visão global)
mean_contrib = contrib_mat.mean(axis=0)
CONTRIB_DF = pd.DataFrame({"feature": contrib_cols, "mean_contrib": mean_contrib})
CONTRIB_DF.sort_values("mean_contrib", ascending=False, inplace=True)
contrib_path = RUN_DIR / "feature_contributions_mean.csv"
CONTRIB_DF.to_csv(contrib_path, sep=";", encoding="utf-8-sig", index=False)
skynet(f"contribuições médias por feature salvas em {contrib_path}")

# histograma de erros com linha de threshold
plt.figure(figsize=(7.0, 4.4))
plt.hist(err_vec, bins=60, alpha=0.85)
plt.axvline(thr_use, linestyle="--", label=f"threshold={thr_use:.6f}")
plt.xlabel("erro de reconstrução")
plt.ylabel("frequência")
plt.title("distribuição do erro de reconstrução (batch scoring)")
plt.legend()
plt.grid(alpha=0.3)
hist_path = RUN_DIR / "scoring_error_hist.png"
plt.savefig(hist_path, dpi=140, bbox_inches="tight")
plt.show()
skynet(f"histograma de erros salvo em {hist_path}")

# imprime amostra dos top-n suspeitos
TOPN_PRINT = 15
rank = OUT.sort_values("recon_error", ascending=False).head(TOPN_PRINT)
print("\nTop suspeitos por erro de reconstrução:")
print(rank[["username","lotacao","tipoconta","valormi","dc","contacontabil","recon_error","flag_threshold","top1","top2","top3"]])

## **Etapa 8:** Monitoramento de drift e drift do erro

In [None]:
# parágrafo 8: monitoramento de drift (PSI por feature) e drift do erro (PSI + KS)

import os, json, math, warnings
from pathlib import Path
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

# dependências dos parágrafos 3, 4 e 7
assert 'RUN_DIR' in globals() and 'skynet' in globals(), "execute os parágrafos anteriores."
artifacts_path = RUN_DIR / "features.pkl"
npz_path       = RUN_DIR / "dataset_npz.npz"            # contém X_train/X_val (padronizados)
val_err_path   = RUN_DIR / "reconstruction_errors_val.npy"  # erros da validação (par. 4)
scores_path    = RUN_DIR / "scores.csv"                 # resultado da pontuação (par. 7)

assert artifacts_path.exists() and npz_path.exists() and val_err_path.exists(), \
    "artefatos ausentes: finalize os parágrafos 3 e 4."
assert scores_path.exists(), "scores.csv ausente. finalize o parágrafo 7."

with open(artifacts_path, "rb") as f:
    feats = pickle.load(f)
FEATURE_COLS = feats["feature_cols"]
imputer      = feats["imputer"]
scaler       = feats["scaler"]

# baseline (treino) para PSI de features
with np.load(npz_path) as npz:
    X_train_base = npz["X_train"]  # já padronizado

# erros baseline (validação do treino) para drift de erro
err_val_base = np.load(val_err_path)

# ----- recuperar features padronizadas do LOTE ATUAL -----
# Se X_std e DF_SCORE ainda estiverem em memória (par.7), ótimo; caso contrário, reconstruir
def _reconst_features_from_source():
    # precisamos do mesmo CSV do par.7; como fallback, usamos o DF reconstruído de scores.csv
    # (scores.csv não contém features; então pedimos para escolher um CSV novamente, garantindo consistência)
    assert 'carregar_validar_csv' in globals() and 'construir_features' in globals(), \
        "funções do parágrafo 2 e 7 são necessárias."
    print("\nreconstruindo features: selecione novamente o CSV a monitorar.")
    # reutiliza a listagem do par.2
    assert 'listar_csvs_mydrive' in globals(), "a função listar_csvs_mydrive do par.2 é necessária."
    csvs = listar_csvs_mydrive(max_files=2000)
    for i, (p, mtime, sz) in enumerate(csvs):
        print(f"[{i:03d}] {sz/1024/1024:6.2f}MB  {str(p)}")
    sel = int(input("\ndigite o índice do csv desejado: ").strip())
    csv_path = Path(csvs[sel][0])
    df_src = carregar_validar_csv(csv_path)
    # filtra linhas críticas como no par.3/7
    mask_ok = (
        df_src["username"].notna() &
        df_src["lotacao"].notna() &
        df_src["tipoconta"].notna() &
        df_src["dc"].isin({"d","c"}) &
        df_src["contacontabil"].astype(str).str.fullmatch(r"\d+") &
        df_src["valormi"].notna() & (df_src["valormi"] >= 0)
    )
    df_use = df_src.loc[mask_ok].reset_index(drop=True)
    X_feat = construir_features(df_use)
    X_imp  = imputer.transform(X_feat.values)
    X_std  = scaler.transform(X_imp)
    return X_std

if 'X_std' in globals():
    X_curr = X_std
else:
    X_curr = _reconst_features_from_source()

# ----- utilitários de PSI/KS -----
def _bin_edges_from_base(base_vals: np.ndarray, n_bins: int = 10):
    # cria bins por quantis no baseline; garante bordas únicas (jitter se necessário)
    qs = np.linspace(0, 1, n_bins + 1)
    edges = np.unique(np.quantile(base_vals, qs))
    # se todas iguais (variância zero), retorna None
    if len(edges) <= 2:
        return None
    return edges

def _hist_proportions(vals: np.ndarray, edges: np.ndarray):
    hist, _ = np.histogram(vals, bins=edges)
    props = hist.astype(float) / max(1, vals.shape[0])
    # suavização mínima para evitar log(0) no PSI
    eps = 1e-6
    props = np.clip(props, eps, 1.0)
    return props

def psi(base: np.ndarray, curr: np.ndarray, n_bins: int = 10) -> float:
    """
    PSI padrão por bins definidos a partir do baseline.
    """
    edges = _bin_edges_from_base(base, n_bins)
    if edges is None:
        return 0.0
    p = _hist_proportions(base, edges)
    q = _hist_proportions(curr, edges)
    return float(np.sum((p - q) * np.log(p / q)))

def ks_2sample(a: np.ndarray, b: np.ndarray):
    """
    KS 2-amostras sem SciPy (ECDF discreta).
    Retorna (D, p_value_approx) com aproximação de p.
    """
    a = np.sort(a)
    b = np.sort(b)
    n, m = len(a), len(b)
    i = j = 0
    d = 0.0
    while i < n and j < m:
        if a[i] <= b[j]:
            i += 1
        else:
            j += 1
        fa = i / n
        fb = j / m
        d = max(d, abs(fa - fb))
    # aproximação de p-value (Massey, 1951)
    en = np.sqrt(n * m / (n + m))
    p = 2.0 * np.exp(-2.0 * (d * en) ** 2)
    p = float(np.clip(p, 0.0, 1.0))
    return float(d), p

# ----- PSI por feature -----
skynet("calculando PSI por feature (baseline = X_train, atual = lote pontuado)")
psi_list = []
# limita a quantidade de colunas para gráficos (mas calcula PSI para todas)
for j, col in enumerate(FEATURE_COLS):
    base_col = X_train_base[:, j]
    curr_col = X_curr[:, j]
    try:
        score = psi(base_col, curr_col, n_bins=10)
    except Exception:
        score = np.nan
    psi_list.append((col, float(score)))

PSI_DF = pd.DataFrame(psi_list, columns=["feature", "psi"]).sort_values("psi", ascending=False)
psi_csv = RUN_DIR / "monitor_psi_features.csv"
PSI_DF.to_csv(psi_csv, sep=";", encoding="utf-8-sig", index=False)
skynet(f"PSI por feature salvo em {psi_csv}")

# ----- Drift do erro de reconstrução -----
# Reusa erro do lote atual se disponível do par.7; senão, estima novamente a partir do scores.csv (não contém erro por si só).
if 'err_vec' in globals():
    err_curr = err_vec
else:
    # tentar recuperar do histograma salvo não é possível; então avisar
    warnings.warn("err_vec não encontrado em memória; reexecute o parágrafo 7 para popular err_vec para análise completa.")
    # fallback: abortar seção de erro se não houver err_vec
    err_curr = None

drift_report = {}
if err_curr is not None and len(err_curr) > 0:
    psi_err = psi(err_val_base, err_curr, n_bins=20)
    ks_D, ks_p = ks_2sample(err_val_base, err_curr)
    drift_report = {"psi_error": float(psi_err), "ks_D": float(ks_D), "ks_p_approx": float(ks_p)}

    # gráficos: hist overlay de erros
    plt.figure(figsize=(7.2, 4.6))
    plt.hist(err_val_base, bins=60, alpha=0.55, label="val (baseline)")
    plt.hist(err_curr,     bins=60, alpha=0.55, label="lote atual")
    plt.xlabel("erro de reconstrução")
    plt.ylabel("frequência")
    plt.title("Distribuição do erro de reconstrução: baseline vs lote")
    plt.legend()
    plt.grid(alpha=0.3)
    err_hist_path = RUN_DIR / "error_hist_overlay.png"
    plt.savefig(err_hist_path, dpi=140, bbox_inches="tight")
    plt.show()
    skynet(f"histograma de erro (overlay) salvo em {err_hist_path}")
else:
    skynet("aviso: err_vec indisponível; pulei gráficos e métricas de drift do erro.")

# ----- Gráfico: Top-k features com maior PSI -----
TOPK = 20
top_df = PSI_DF.head(TOPK)
plt.figure(figsize=(8.8, max(4.0, 0.3 * TOPK)))
plt.barh(top_df["feature"][::-1], top_df["psi"][::-1])
plt.xlabel("PSI")
plt.title(f"Top {TOPK} features com maior PSI (baseline X_train vs lote)")
plt.grid(axis="x", alpha=0.3)
psi_bar_path = RUN_DIR / "psi_bar_top.png"
plt.tight_layout()
plt.savefig(psi_bar_path, dpi=140)
plt.show()
skynet(f"gráfico de barras do PSI salvo em {psi_bar_path}")

# ----- Salvamento do relatório de drift do erro -----
if drift_report:
    drift_json = RUN_DIR / "monitor_error_drift.json"
    with open(drift_json, "w", encoding="utf-8") as f:
        json.dump(drift_report, f, ensure_ascii=False, indent=2)
    print("\nresumo drift do erro (baseline val vs lote):")
    print(pd.Series(drift_report))
    skynet(f"relatório de drift do erro salvo em {drift_json}")

# ----- Sinalizadores práticos -----
# heurísticas comuns de PSI:
#   < 0.1: estável; 0.1–0.25: atenção; > 0.25: shift relevante
flags = {
    "num_features_psi_gt_0_25": int((PSI_DF["psi"] > 0.25).sum()),
    "num_features_psi_gt_0_10": int((PSI_DF["psi"] > 0.10).sum()),
    "max_feature_psi": float(PSI_DF["psi"].max() if len(PSI_DF) else np.nan),
    "max_feature_name": str(PSI_DF.iloc[0]["feature"] if len(PSI_DF) else ""),
}
with open(RUN_DIR / "monitor_flags.json", "w", encoding="utf-8") as f:
    json.dump(flags, f, ensure_ascii=False, indent=2)
print("\nresumo de flags de estabilidade (PSI):")
print(pd.Series(flags))
skynet("monitoramento concluído: PSI por feature, drift do erro e flags salvos no RUN_DIR")

### **Complemento** Análise gráfica do drift

In [None]:
# complemento interpretativo do parágrafo 8: histogramas e ECDF com marcação do KS

import numpy as np
import matplotlib.pyplot as plt
from pathlib import Path

# pré-requisitos: err_val_base (do par.8) e err_curr (do par.7) em memória
if 'err_val_base' not in globals():
    err_val_base = np.load(RUN_DIR / "reconstruction_errors_val.npy")
if 'err_curr' not in globals():
    raise RuntimeError("err_curr indisponível. execute o parágrafo 7 para calcular os erros do lote atual (err_vec).")

# função utilitária para ECDF (degrau)
def _ecdf(x: np.ndarray):
    x = np.sort(np.asarray(x))
    y = np.arange(1, len(x) + 1) / len(x)
    return x, y

# calcula ECDFs
x1, F1 = _ecdf(err_val_base)
x2, F2 = _ecdf(err_curr)

# constrói grade comum de x (todos os pontos observados)
x_all = np.sort(np.unique(np.concatenate([x1, x2])))

# valores de F1 e F2 sobre a grade comum (passo à esquerda)
def _step_at(x_grid, x_vals, F_vals):
    # para cada x_grid, pega F(x) = proporção de pontos <= x_grid
    idx = np.searchsorted(x_vals, x_grid, side="right") - 1
    idx = np.clip(idx, -1, len(F_vals) - 1)
    out = np.where(idx >= 0, F_vals[idx], 0.0)
    return out

F1g = _step_at(x_all, x1, F1)
F2g = _step_at(x_all, x2, F2)

# estatística KS e ponto de maior divergência
diff = np.abs(F1g - F2g)
ks_D = float(diff.max())
argmax = int(diff.argmax())
x_star = float(x_all[argmax])

# 1) histograma comparativo (normalizado) com anotação KS
plt.figure(figsize=(7.8, 4.8))
plt.hist(err_val_base, bins=60, alpha=0.55, density=True, label="validação (baseline)")
plt.hist(err_curr,     bins=60, alpha=0.55, density=True, label="lote atual")
plt.axvline(x_star, linestyle="--", label=f"x* (KS) ≈ {x_star:.6f}")
plt.title("erro de reconstrução — distribuição (baseline vs. lote)")
plt.xlabel("erro de reconstrução")
plt.ylabel("densidade")
plt.legend()
plt.grid(alpha=0.3)
hist_interp_path = RUN_DIR / "error_drift_interpret_hist.png"
plt.tight_layout()
plt.savefig(hist_interp_path, dpi=140)
plt.show()
skynet(f"gráfico interpretativo (hist): {hist_interp_path}")

# 2) ECDFs com marcação visual do KS (distância máxima vertical)
plt.figure(figsize=(7.8, 4.8))
plt.step(x_all, F1g, where="post", label="ECDF val (baseline)")
plt.step(x_all, F2g, where="post", label="ECDF lote atual")

# marca o ponto de maior divergência
plt.axvline(x_star, linestyle="--", alpha=0.8)
# desenha o segmento vertical da distância KS
y1_star = F1g[argmax]
y2_star = F2g[argmax]
y_low, y_high = sorted([y1_star, y2_star])
plt.vlines(x_star, y_low, y_high, linewidth=3, alpha=0.9, label=f"KS = {ks_D:.3f}")

plt.title("erro de reconstrução — ECDFs e distância KS")
plt.xlabel("erro de reconstrução")
plt.ylabel("F(x)")
plt.legend()
plt.grid(alpha=0.3)
ecdf_interp_path = RUN_DIR / "error_drift_interpret_ecdf.png"
plt.tight_layout()
plt.savefig(ecdf_interp_path, dpi=140)
plt.show()
skynet(f"gráfico interpretativo (ecdf): {ecdf_interp_path}")

# explicação rápida impressa
print(
    f"\ninterpretação rápida:\n"
    f"- KS = {ks_D:.3f} é a maior distância vertical entre as ECDFs no ponto x* ≈ {x_star:.6f}.\n"
    f"- quanto maior o KS, maior a evidência de que as distribuições de erro mudaram.\n"
    f"- se KS estiver próximo de 1.0, as curvas quase não se sobrepõem; se perto de 0, são muito similares."
)