#**Licença de Uso**


This repository uses a **dual-license model** to distinguish between source code and creative/documental content.

**Code** (Python scripts, modules, utilities):
Licensed under the MIT License.

→ You may freely use, modify, and redistribute the code, including for commercial purposes, provided that you preserve the copyright notice.

**Content** (Jupyter notebooks, documentation, reports, datasets, and generated outputs):
Licensed under the Creative Commons Attribution–NonCommercial 4.0 International License.

→ You may share and adapt the content for non-commercial purposes, provided that proper credit is given to the original author.


**© 2025 Leandro Bernardo Rodrigues**


#**Pré-Configuração**

##**Código de uso único**
Aplicação persistente entre sessões do Google Colab

---
**Uso expecífico para Google Colab.**

**Aviso:** implementação no JupytherHub e GitLab são diferentes.

In [None]:
# @title
#re-clone limpo, configurações git/nbdime/jupytext, pull/rebase e push com fallback para PAT
from google.colab import drive
drive.mount('/content/drive', force_remount=True)

import os, sys, subprocess, getpass, shutil, pathlib, time

#parâmetros do projeto e remoto
BASE = "/content/drive/MyDrive/Notebooks"
REPO = "data-analysis"
PROJ = f"{BASE}/{REPO}"
GITHUB_OWNER = "LeoBR84p"
GITHUB_REPO  = "data-analysis"
CLEAN_URL    = f"https://github.com/{GITHUB_OWNER}/{GITHUB_REPO}.git"

os.makedirs(BASE, exist_ok=True)

def run(cmd, cwd=None, check=True, capture=False):
    #wrapper para subprocess.run com echo do diretório
    print(f"\n$ (pwd={cwd or os.getcwd()})", " ".join(cmd))
    return subprocess.run(
        cmd,
        cwd=cwd,
        check=check,
        text=True,
        capture_output=capture
    )

#garantir que estamos em BASE
%cd "$BASE"
print("pwd(BASE):", os.getcwd())

#se já existe a pasta do projeto, mover para backup com timestamp
if os.path.exists(PROJ):
    ts = time.strftime("%Y%m%d_%H%M%S")
    backup = f"{PROJ}_backup_{ts}"
    print(f"pasta {PROJ} já existe, movendo para {backup}")
    shutil.move(PROJ, backup)

#re-clonar limpo
print(f"clonando repositório em {PROJ}…")
run(["git", "clone", CLEAN_URL], cwd=BASE)

#entrar no repositório
%cd "$PROJ"
print("pwd(PROJ):", os.getcwd())

#configurar git no repositório (escopo local)
run(["git", "config", "--local", "user.name", "Leandro Bernardo Rodrigues"], cwd=PROJ)
run(["git", "config", "--local", "user.email", "bernardo.leandro@gmail.com"], cwd=PROJ)
run(["git", "config", "--local", "init.defaultBranch", "main"], cwd=PROJ)

#criar pastas utilitárias (idempotente)
for p in ["notebooks", "src", "data", "output", "runs", "configs"]:
    os.makedirs(os.path.join(PROJ, p), exist_ok=True)

#instalar ferramentas úteis nesta sessão
!pip -q install jupytext nbdime nbstripout

#habilitar nbdime no git global (no colab --local costuma falhar)
run(["nbdime", "config-git", "--enable", "--global"])

#criar .gitignore e .gitattributes se não existirem
gi = pathlib.Path(PROJ) / ".gitignore"
ga = pathlib.Path(PROJ) / ".gitattributes"
if not gi.exists():
    gi.write_text("""\
.ipynb_checkpoints/
.DS_Store
Thumbs.db
*.log
*.tmp
# dados/artefatos pesados (não versionar)
data/
output/
runs/
# python
venv/
.venv/
__pycache__/
*.pyc
# segredos
.env
*.key
*.pem
*.tok
""", encoding="utf-8")
if not ga.exists():
    ga.write_text("*.ipynb filter=nbstripout\n", encoding="utf-8")

#ativar hook do nbstripout neste repositório
run(["nbstripout", "--install", "--attributes", ".gitattributes"], cwd=PROJ)

#parear notebooks com .py (se existirem)
notebooks_glob = os.path.join(PROJ, "notebooks", "*.ipynb")
run(["bash", "-lc", f"jupytext --set-formats ipynb,py:percent --sync {notebooks_glob} || true"], cwd=PROJ)

#commit e pull --rebase para alinhar
run(["git", "add", "-A"], cwd=PROJ)
run(["git", "status"], cwd=PROJ, check=False)
run(["git", "commit", "-m", "chore: setup local (.gitignore/.gitattributes, nbstripout, jupytext config)"], cwd=PROJ, check=False)
run(["git", "pull", "--rebase", "origin", "main"], cwd=PROJ, check=False)

#push com fallback para PAT
print("\nTentando push sem credenciais…")
push = subprocess.run(["git","push","origin","main"], cwd=PROJ, capture_output=True, text=True)
if push.returncode == 0:
    print("push concluído sem PAT.")
else:
    print("primeiro push falhou. usando PAT…")
    token = getpass.getpass("cole seu GitHub PAT (não será exibido): ").strip()
    username = GITHUB_OWNER
    auth_url = f"https://{username}:{token}@github.com/{GITHUB_OWNER}/{GITHUB_REPO}.git"
    #testar credenciais
    test = subprocess.run(["git","ls-remote", auth_url], cwd=PROJ, capture_output=True, text=True)
    if test.returncode != 0:
        print("falha ao autenticar com o PAT:")
        print(test.stderr or test.stdout)
        raise SystemExit(1)
    #trocar remote para URL autenticada, push e restaurar
    try:
        run(["git","remote","set-url","origin", auth_url], cwd=PROJ)
        out = subprocess.run(["git","push","origin","main"], cwd=PROJ, capture_output=True, text=True)
        if out.returncode != 0:
            print("falha no push mesmo com PAT:\n", out.stderr or out.stdout)
            raise SystemExit(out.returncode)
        print("push concluído com PAT.")
    finally:
        subprocess.run(["git","remote","set-url","origin", CLEAN_URL], cwd=PROJ)

Espelhamento da estrutura entre GitHub e Colab (assumindo GitHub correto)

**Atenção:** Apaga todos os arquivos do Drive que não se encontram no GitHub

In [None]:
#@title
#sincronizar uma vez (GitHub -> Google Drive)
import os, shutil, subprocess, sys, getpass, json, re, time
from pathlib import Path

# ---- CONFIGURAÇÃO PELO USUÁRIO ----
REPO_URL     = "https://github.com/LeoBR84p/data-analysis.git"  # <== coloque sua URL https do GitHub aqui
BRANCH       = "main"  # branch padrão, altere se necessário
DRIVE_TARGET = "/content/drive/MyDrive/Notebooks/data-analysis"  # <== caminho destino no Drive, ex: "/content/drive/MyDrive/meu-repo"

# ---- MONTAR DRIVE ----
try:
    from google.colab import drive  # type: ignore
    drive.mount('/content/drive')
except Exception as e:
    print("Aviso: não parece ser o Google Colab ou o Drive já está montado.", e)

# ---- VALIDAÇÕES ----
assert REPO_URL.strip(), "Defina REPO_URL com a URL do seu repositório GitHub."
assert DRIVE_TARGET.strip(), "Defina DRIVE_TARGET com a pasta destino no Drive."

TARGET = Path(DRIVE_TARGET).expanduser().absolute()
TARGET.mkdir(parents=True, exist_ok=True)

TMP_DIR = Path("/content/_tmp_clone")
if TMP_DIR.exists():
    shutil.rmtree(TMP_DIR)
TMP_DIR.mkdir(parents=True, exist_ok=True)

def run(cmd, cwd=None, check=True, capture=False):
    """Executa comando e retorna (rc, out)."""
    result = subprocess.run(
        cmd, cwd=cwd, check=False,
        stdout=subprocess.PIPE if capture else None,
        stderr=subprocess.STDOUT if capture else None,
        text=True
    )
    if check and result.returncode != 0:
        print(result.stdout or "")
        raise RuntimeError(f"Falha ao executar: {' '.join(cmd)} (rc={result.returncode})")
    return result.returncode, (result.stdout or "")

print("Clonando repositório em modo raso (depth=1)...")
run(["git", "clone", "--depth", "1", "--branch", BRANCH, REPO_URL, str(TMP_DIR)])

SRC = TMP_DIR  # pasta com o conteúdo do repositório

IGNORE_NAMES = {".git", "__pycache__", ".ipynb_checkpoints", ".DS_Store", "Thumbs.db"}

def should_ignore(path: Path) -> bool:
    name = path.name
    if name in IGNORE_NAMES:
        return True
    # ignorar .pyc e pastas de cache do Python
    if name.endswith(".pyc"):
        return True
    return False

def sync_dirs(src: Path, dst: Path):
    """Torna dst idêntico a src: copia novos/alterados e remove o que não existe em src."""
    # 1) Copiar/atualizar arquivos de src -> dst
    for root, dirs, files in os.walk(src):
        root_p = Path(root)
        rel = root_p.relative_to(src)
        dst_root = dst / rel
        dst_root.mkdir(parents=True, exist_ok=True)

        # filtrar dirs ignorados (não desce neles)
        dirs[:] = [d for d in dirs if not should_ignore(Path(d))]

        for f in files:
            fpath = root_p / f
            if should_ignore(fpath):
                continue
            dpath = dst_root / f
            # copiar se não existe ou se o conteúdo mudou (tamanho ou mtime)
            if not dpath.exists() or os.path.getsize(fpath) != os.path.getsize(dpath) \
               or abs(os.path.getmtime(fpath) - os.path.getmtime(dpath)) > 1.0:
                dpath.parent.mkdir(parents=True, exist_ok=True)
                shutil.copy2(fpath, dpath)

    # 2) Remover do dst tudo que não está em src
    src_set = set()
    for root, dirs, files in os.walk(src):
        root_p = Path(root)
        rel = root_p.relative_to(src)
        for d in dirs:
            p = (rel / d)
            if not should_ignore(Path(d)):
                src_set.add(("d", p.as_posix()))
        for f in files:
            if should_ignore(Path(f)):
                continue
            p = (rel / f)
            src_set.add(("f", p.as_posix()))

    for root, dirs, files in os.walk(dst, topdown=False):
        root_p = Path(root)
        rel = root_p.relative_to(dst)

        for f in files:
            p = (rel / f).as_posix()
            if ("f", p) not in src_set:
                fp = root_p / f
                if not should_ignore(fp):
                    try:
                        fp.unlink()
                    except Exception:
                        pass

        for d in dirs:
            p = (rel / d).as_posix()
            dp = root_p / d
            if ("d", p) not in src_set and dp.exists():
                # não remover a raiz alvo e nem diretórios ignorados
                if not should_ignore(dp):
                    try:
                        shutil.rmtree(dp)
                    except Exception:
                        pass

print(f"Sincronizando para {TARGET} ...")
sync_dirs(SRC, TARGET)
print("Concluído! A pasta no Drive agora espelha o conteúdo do branch do GitHub.")

# limpeza temporária
try:
    shutil.rmtree(TMP_DIR)
except Exception:
    pass

##**--> Código a cada sessão**
---
Aplicação não persistente entre sessões.

Necessário para sincronização e versionamento de alterações no código.

###**Montar e sincronizar**

In [None]:
# @title
#setup por sessão (colab)
from google.colab import drive
import os, time, subprocess, getpass, pathlib, sys

BASE = "/content/drive/MyDrive/Notebooks"
REPO = "data-analysis"
PROJ = f"{BASE}/{REPO}"

def safe_mount_google_drive():
    #monta ou remonta o google drive de forma resiliente
    try:
        drive.mount('/content/drive', force_remount=True)
    except Exception:
        try:
            drive.flush_and_unmount()
        except Exception:
            pass
        time.sleep(1.0)
        drive.mount('/content/drive', force_remount=True)

def safe_chdir(path):
    #usa os.chdir (evita %cd com f-string)
    if not os.path.exists(path):
        raise FileNotFoundError(f"Caminho não existe: {path}")
    os.chdir(path)
    print("Diretório atual:", os.getcwd())

def branch_a_frente():
    #retorna true se head está à frente do upstream (há o que enviar)
    ahead = subprocess.run(
        ["git","rev-list","--left-right","--count","HEAD...@{upstream}"],
        capture_output=True, text=True
    )
    if ahead.returncode != 0:
        status = subprocess.run(["git","status","-sb"], capture_output=True, text=True)
        return "ahead" in (status.stdout or "")
    left_right = (ahead.stdout or "").strip().split()
    return len(left_right) == 2 and left_right[0].isdigit() and int(left_right[0]) > 0

def push_seguro(owner="LeoBR84p", repo="temporal-graph-network", username="LeoBR84p"):
    #realiza push usando pat em memória; restaura url limpa ao final
    clean_url = f"https://github.com/{owner}/{repo}.git"
    token = getpass.getpass("Cole seu GitHub PAT (Contents: Read and write): ").strip()
    auth_url = f"https://{username}:{token}@github.com/{owner}/{repo}.git"
    test = subprocess.run(["git","ls-remote", auth_url], capture_output=True, text=True)
    if test.returncode != 0:
        print("Falha na autenticação (read). Revise token/permissões:")
        print(test.stderr or test.stdout); return
    try:
        subprocess.run(["git","remote","set-url","origin", auth_url], check=True)
        out = subprocess.run(["git","push","origin","main"], capture_output=True, text=True)
        if out.returncode != 0:
            print("Falha no push (write). Revise permissões do token:")
            print(out.stderr or out.stdout)
        else:
            print("Push concluído com PAT.")
    finally:
        subprocess.run(["git","remote","set-url","origin", clean_url], check=False)

#montar/remontar o google drive e entrar no projeto
safe_mount_google_drive()
os.makedirs(BASE, exist_ok=True)
if not os.path.exists(PROJ):
    print(f"Atenção: pasta do projeto não encontrada em {PROJ}. "
          "Execute seu bloco de configuração única (clone) primeiro.")
else:
    print("Pasta do projeto encontrada.")
safe_chdir(PROJ)

#sanity check do repositório git
if not os.path.isdir(".git"):
    print("Aviso: esta pasta não parece ser um repositório Git (.git ausente). "
          "Rode o bloco de configuração única.")
else:
    print("Repositório Git detectado.")

#instalar dependências efêmeras desta sessão
!pip -q install jupytext nbdime nbstripout
!nbdime config-git --enable --global

#atualizar do remoto
!git fetch origin
!git pull --rebase origin main

#sincronizar notebooks → .py (jupytext)
!jupytext --sync notebooks/*.ipynb || true

#ciclo de versionamento do dia (commit genérico opcional)
!git add -A
!git status
!git commit -m "feat: ajustes no notebook X e pipeline Y" || true

#push somente se houver commits locais à frente; com fallback para pat
if branch_a_frente():
    out = subprocess.run(["git","push","origin","main"], capture_output=True, text=True)
    if out.returncode == 0:
        print("Push concluído sem PAT.")
    else:
        print("Push sem PAT falhou. Chamando push_seguro()…")
        push_seguro()
else:
    print("Nada para enviar (branch sincronizada com o remoto).")

###**Utilitários Git**

In [None]:
# @title
#helpers de git: push seguro, commit customizado e tag de release
import subprocess, getpass

#ajuste se você mudar o nome do repositório/usuário
OWNER = "LeoBR84p"
REPO = "data-analysis"
USERNAME = "LeoBR84p"
BRANCH = "main"
REMOTE = "origin"

def branch_a_frente():
    #retorna true se head está à frente do upstream (há o que enviar)
    out = subprocess.run(
        ["git","rev-list","--left-right","--count",f"HEAD...@{{upstream}}"],
        capture_output=True, text=True
    )
    if out.returncode != 0:
        st = subprocess.run(["git","status","-sb"], capture_output=True, text=True)
        return "ahead" in (st.stdout or "")
    left_right = (out.stdout or "").strip().split()
    return len(left_right) == 2 and left_right[0].isdigit() and int(left_right[0]) > 0

def push_seguro(owner=OWNER, repo=REPO, username=USERNAME, remote=REMOTE, branch=BRANCH):
    #realiza push usando pat em memória; restaura url limpa ao final
    clean_url = f"https://github.com/{owner}/{repo}.git"
    token = getpass.getpass("cole seu github pat (contents: read and write): ").strip()
    auth_url = f"https://{username}:{token}@github.com/{owner}/{repo}.git"
    test = subprocess.run(["git","ls-remote", auth_url], capture_output=True, text=True)
    if test.returncode != 0:
        print("falha na autenticação (read). revise token/permissões:")
        print(test.stderr or test.stdout); return False
    try:
        subprocess.run(["git","remote","set-url", remote, auth_url], check=True)
        out = subprocess.run(["git","push", remote, branch], capture_output=True, text=True)
        if out.returncode != 0:
            print("falha no push (write). revise permissões do token:")
            print(out.stderr or out.stdout); return False
        print("push concluído com pat.")
        return True
    finally:
        subprocess.run(["git","remote","set-url", remote, clean_url], check=False)

def try_push_branch(remote=REMOTE, branch=BRANCH):
    #tenta push direto da branch atual
    out = subprocess.run(["git","push", remote, branch], capture_output=True, text=True)
    if out.returncode == 0:
        print("push concluído.")
        return True
    print("push sem credencial falhou:")
    print((out.stderr or out.stdout).strip())
    return False

def try_push_tag(tag, remote=REMOTE):
    #tenta enviar somente a tag
    out = subprocess.run(["git","push", remote, tag], capture_output=True, text=True)
    if out.returncode == 0:
        print(f"tag enviada: {tag}")
        return True
    print("falha ao enviar a tag:")
    print((out.stderr or out.stdout).strip())
    return False

def commit_custom(msg: str, auto_push: bool = True):
    #adiciona tudo, cria commit com a mensagem informada e faz push opcional (com fallback para pat)
    subprocess.run(["git","add","-A"], check=False)
    com = subprocess.run(["git","commit","-m", msg], capture_output=True, text=True)
    if com.returncode != 0:
        print((com.stderr or com.stdout or "nada para commitar.").strip())
        return
    print(com.stdout.strip())
    if auto_push and branch_a_frente():
        if not try_push_branch():
            print("tentando push seguro…")
            push_seguro()

def tag_release(tag: str, message: str = "", auto_push: bool = True):
    #cria uma tag anotada (release) e faz push da tag com fallback para pat
    exists = subprocess.run(["git","tag","--list", tag], capture_output=True, text=True)
    if tag in (exists.stdout or "").split():
        print(f"tag '{tag}' já existe. para refazer: git tag -d {tag} && git push {REMOTE} :refs/tags/{tag}")
        return
    args = ["git","tag","-a", tag, "-m", (message or tag)]
    mk = subprocess.run(args, capture_output=True, text=True)
    if mk.returncode != 0:
        print("falha ao criar a tag:")
        print(mk.stderr or mk.stdout); return
    print(f"tag criada: {tag}")
    if auto_push:
        if not try_push_tag(tag):
            print("tentando push seguro da tag…")
            if push_seguro():
                try_push_tag(tag)

#**--> Sincronizar alterações no código do projeto**
Comandos para sincronizar código (Google Drive, Git, GitHub) e realizar versionamento

Tag de release atual: 1.0

In [None]:
# === CÉLULA ÚNICA: reparar + versionar + commit + tag + push (robusto p/ uso recorrente) ===
from pathlib import Path
import subprocess, re, getpass, sys

# AJUSTE AQUI:
REPO_DIR = Path("/content/drive/MyDrive/Notebooks/data-analysis")  # pasta do repo no Drive

# ----------------- Helpers básicos -----------------
def run(cmd, cwd=None, check=True):
    p = subprocess.run(cmd, cwd=cwd, text=True, capture_output=True)
    if check and p.returncode != 0:
        print(p.stdout + p.stderr)
        raise RuntimeError(f"Falha: {' '.join(cmd)} (rc={p.returncode})")
    return p.stdout.strip()

def git_exists_repo(path: Path) -> bool:
    return (path / ".git").exists()

def is_detached(cwd: Path) -> bool:
    return subprocess.run(["git", "symbolic-ref", "-q", "HEAD"], cwd=cwd).returncode != 0

def current_branch(cwd: Path) -> str:
    return run(["git", "rev-parse", "--abbrev-ref", "HEAD"], cwd=cwd, check=False) or ""

def has_origin(cwd: Path) -> bool:
    url = run(["git", "remote", "get-url", "origin"], cwd=cwd, check=False)
    return bool(url.strip())

def origin_url(cwd: Path) -> str:
    return run(["git", "remote", "get-url", "origin"], cwd=cwd, check=False).strip()

def repo_is_shallow(cwd: Path) -> bool:
    return (cwd / ".git" / "shallow").exists() or (run(["git", "rev-parse", "--is-shallow-repository"], cwd=cwd, check=False).strip() == "true")

def remote_heads(cwd: Path):
    out = run(["git", "ls-remote", "--heads", "origin"], cwd=cwd, check=False)
    heads = []
    for line in (out or "").splitlines():
        parts = line.split()
        if len(parts) == 2 and parts[1].startswith("refs/heads/"):
            heads.append(parts[1].split("/")[-1])
    return heads

def origin_default_branch(cwd: Path):
    out = run(["git", "remote", "show", "origin"], cwd=cwd, check=False)
    m = re.search(r"HEAD branch:\s*(\S+)", out or "")
    if m:
        return m.group(1)
    out2 = run(["git", "symbolic-ref", "refs/remotes/origin/HEAD"], cwd=cwd, check=False)
    if out2.startswith("refs/remotes/origin/"):
        return out2.split("/")[-1]
    heads = remote_heads(cwd)
    if "main" in heads: return "main"
    if "master" in heads: return "master"
    return heads[0] if heads else None

def ensure_identity(cwd: Path):
    if not run(["git", "config", "--get", "user.name"], cwd=cwd, check=False):
        run(["git", "config", "user.name", "Colab User"], cwd=cwd)
    if not run(["git", "config", "--get", "user.email"], cwd=cwd, check=False):
        run(["git", "config", "user.email", "colab-user@example.com"], cwd=cwd)

# ----------------- Reparo do repositório -----------------
def repair_repository(cwd: Path) -> str:
    """Conserta HEAD destacado, refs quebradas, clone raso e retorna a branch ativa garantida."""
    if not git_exists_repo(cwd):
        raise SystemExit(f"{cwd} não é um repositório Git.")

    ensure_identity(cwd)

    # Tentar apontar HEAD do origin automaticamente (não falha se não houver)
    if has_origin(cwd):
        run(["git", "remote", "set-head", "origin", "-a"], cwd=cwd, check=False)

    # Fetch inicial + tentar desraso
    if has_origin(cwd):
        # lidar com repositórios que “não enviam todos os objetos”
        # 1) fetch prune/tags (tolerante a falha)
        run(["git", "fetch", "--prune", "--tags", "origin"], cwd=cwd, check=False)
        # 2) unshallow (ou aumentar depth) se necessário
        if repo_is_shallow(cwd):
            try:
                run(["git", "fetch", "--unshallow", "origin"], cwd=cwd)
            except Exception:
                run(["git", "fetch", "--depth=1000000", "origin"], cwd=cwd, check=False)

    # Descobrir branch padrão do remoto (ou usar local atual)
    defbr = origin_default_branch(cwd) if has_origin(cwd) else None
    cur = current_branch(cwd)
    # Se HEAD destacado, vamos fixar em algo consistente
    if is_detached(cwd):
        if defbr:  # se sabemos a default do origin, alinhar nela
            # tenta pegar a ponta remota; se falhar, cria local
            try:
                run(["git", "checkout", "-B", defbr, f"origin/{defbr}"], cwd=cwd, check=False)
            except Exception:
                run(["git", "checkout", "-B", defbr], cwd=cwd)
            cur = defbr
        else:
            # sem remoto ou remoto vazio: padroniza 'main'
            run(["git", "checkout", "-B", "main"], cwd=cwd)
            cur = "main"
    else:
        if defbr and cur != defbr:
            # Se a branch local não for a default e você quer padronizar, mude aqui.
            # Para segurança, manteremos a branch atual (cur) — mas vamos setar upstream abaixo.
            pass

    # Ajustar upstream quando houver origin e branch remota correspondente
    if has_origin(cwd):
        # se a branch remota existir, setamos upstream; senão, criaremos no push
        br_to_track = defbr or cur or "main"
        heads = remote_heads(cwd)
        if br_to_track in heads:
            run(["git", "checkout", "-B", br_to_track], cwd=cwd, check=False)
            run(["git", "branch", "--set-upstream-to", f"origin/{br_to_track}", br_to_track], cwd=cwd, check=False)
            # tentar rebase (se falhar, segue)
            run(["git", "pull", "--rebase", "origin", br_to_track], cwd=cwd, check=False)
            cur = br_to_track

    # Sanitização extra (refs quebradas, objetos faltando)
    run(["git", "reflog", "expire", "--all", "--expire=now"], cwd=cwd, check=False)
    run(["git", "gc", "--prune=now"], cwd=cwd, check=False)
    run(["git", "fsck"], cwd=cwd, check=False)

    # Garante que estamos numa branch de verdade
    if is_detached(cwd):
        # Se ainda está detached, crie/force uma 'main' local no commit atual
        run(["git", "checkout", "-B", "main"], cwd=cwd)
        cur = "main"

    return cur or "main"

# ----------------- Versão: ler/gravar/bump -----------------
def read_version(cwd: Path):
    vf = (cwd / "VERSION")
    if vf.exists():
        m = re.match(r"^\s*(\d+)\.(\d+)\s*$", vf.read_text(encoding="utf-8"))
        if m: return f"{m.group(1)}.{m.group(2)}"
    pp = (cwd / "pyproject.toml")
    if pp.exists():
        m = re.search(r'(?m)^\s*version\s*=\s*"(\d+)\.(\d+)"\s*$', pp.read_text(encoding="utf-8"))
        if m: return f"{m.group(1)}.{m.group(2)}"
    return None

def write_version(cwd: Path, ver: str):
    (cwd / "VERSION").write_text(ver + "\n", encoding="utf-8")
    pp = (cwd / "pyproject.toml")
    if pp.exists():
        txt = pp.read_text(encoding="utf-8")
        if re.search(r'(?m)^\s*version\s*=\s*".*"\s*$', txt):
            txt = re.sub(r'(?m)^\s*version\s*=\s*".*"\s*$', f'version = "{ver}"', txt)
        else:
            if "[project]" in txt:
                txt = txt.replace("[project]", f'[project]\nversion = "{ver}"', 1)
            else:
                txt += f'\n[project]\nversion = "{ver}"\n'
        pp.write_text(txt, encoding="utf-8")

def bump(ver: str, kind: str) -> str:
    M, m = map(int, ver.split("."))
    return f"{M+1}.0" if kind == "major" else f"{M}.{m+1}"

# ----------------- Push com fallback -----------------
def remote_has_branch(cwd: Path, branch: str) -> bool:
    out = run(["git", "ls-remote", "--heads", "origin", branch], cwd=cwd, check=False)
    return bool(out.strip())

def push_with_fallback(cwd: Path, branch: str, push_tags: bool):
    # 1) push normal (garantindo branch correta)
    try:
        if remote_has_branch(cwd, branch):
            run(["git", "push", "origin", branch], cwd=cwd)
        else:
            run(["git", "push", "-u", "origin", branch], cwd=cwd)
        if push_tags:
            run(["git", "push", "--tags"], cwd=cwd)
        print("Push realizado com sucesso.")
        return
    except Exception:
        print("Push normal falhou. Tentando push seguro com PAT...")

    # 2) push com PAT e HEAD:<branch> (funciona mesmo se o git 'cair' em detached)
    origin = origin_url(cwd)
    if not origin or not origin.startswith("https://"):
        raise RuntimeError("Remoto 'origin' ausente ou não-HTTPS; ajuste para HTTPS para usar PAT.")
    token = getpass.getpass("Informe seu GitHub PAT (não será exibido): ").strip()
    if not token:
        raise RuntimeError("PAT não informado.")
    authed = "https://" + token + "@" + origin[len("https://"):]
    run(["git", "push", authed, f"HEAD:{branch}"], cwd=cwd)
    if push_tags:
        run(["git", "push", authed, "--tags"], cwd=cwd)
    print("Push via PAT concluído com sucesso.")

# ----------------- Fluxo principal -----------------
if not git_exists_repo(REPO_DIR):
    raise SystemExit(f"{REPO_DIR} não é um repositório Git.")

# 1) Reparar repo e descobrir/garantir branch ativa
branch = repair_repository(REPO_DIR)
print(f"Branch ativa garantida: {branch}")

# 2) Sincronizar com remoto (tolerante a falhas; rebase reduz rejeições)
if has_origin(REPO_DIR):
    run(["git", "fetch", "--prune", "--tags", "origin"], cwd=REPO_DIR, check=False)
    # tentar unshallow novamente caso a reparação tenha mudado algo
    if repo_is_shallow(REPO_DIR):
        run(["git", "fetch", "--unshallow", "origin"], cwd=REPO_DIR, check=False)
    run(["git", "pull", "--rebase", "origin", branch], cwd=REPO_DIR, check=False)

# 3) Versão
ver = read_version(REPO_DIR)
if not ver:
    ver = "1.0"
    write_version(REPO_DIR, ver)
    print("Arquivo VERSION criado com versão inicial 1.0.")
print("Versão atual:", ver)

# 4) Entrada do commit
try:
    msg = input("Mensagem detalhada do commit: ").strip()
except EOFError:
    msg = ""
if not msg:
    msg = "não informado"

print("[M] Maior (ex.: 1.0→2.0) | [m] Menor (ex.: 1.0→1.1) | [n] Nenhuma")
try:
    ch = input("Sua escolha (M/m/n): ").strip()
except EOFError:
    ch = "n"
# normalizar
ch = (ch or "n").strip()
kind = "none"
if ch in ("M","m","n"):
    kind = {"M":"major","m":"minor","n":"none"}[ch]
else:
    kind = "none"

# 5) Bump + tag
new_ver = ver
tag = None
if kind in ("major","minor"):
    new_ver = bump(ver, kind)
    write_version(REPO_DIR, new_ver)
    tag = f"v{new_ver}"
    print(f"Versão atualizada: {ver} → {new_ver}")

# 6) Commit
run(["git", "add", "-A"], cwd=REPO_DIR)
status = run(["git", "status", "--porcelain"], cwd=REPO_DIR)
if status.strip():
    run(["git", "commit", "-m", msg], cwd=REPO_DIR)
    print("Commit criado.")
else:
    print("Nada para commit.")

# 7) Tag anotada (se houve bump)
if tag:
    existing = [t.strip() for t in run(["git", "tag"], cwd=REPO_DIR).splitlines() if t.strip()]
    if tag not in existing:
        run(["git", "tag", "-a", tag, "-m", f"Release {tag}"], cwd=REPO_DIR)
        print(f"Tag criada: {tag}")

# 8) Pull rebase (mais uma vez, já com commit/tag local) — tolerante
if has_origin(REPO_DIR):
    run(["git", "pull", "--rebase", "origin", branch], cwd=REPO_DIR, check=False)

# 9) Push com fallback; usa HEAD:<branch> no fallback para evitar erro de detached
push_with_fallback(REPO_DIR, branch=branch, push_tags=bool(tag))

print("Processo concluído.")

#**Checklist rápido de execução**
**Etapas:**
- 01–04: setup (ambiente, dependências, diretórios e configs)
- 05–08: execução (ingestão dos dados, análise de cabeçalhos, análise preliminar dos dados e análise de tipologias)
- 09: geração de output (salva análise, gera gráficos gerais, gera gráficos específicos e relatórios em HTML+PDF)

#**Ferramentas de Data Analysis**

Ferramentas de **identificação de tipo de dado e estrutura da informação** presente em *datasets* a partir da ingestão de arquivos CSV UTF-8 com BOM em padrão separado por ponto e vírgula.
_____



### **Etapa 1:** Ativação do ambiente virtual (utilizando atualmente Google Colab para prototipação com dados sintéticos)
---
Necessário ajustar pontualmente em caso de utilização em outro ambiente de notebook.

In [None]:
# @title
import os

# Define the path for the virtual environment inside Google Drive
# Ensure BASE and REPO are defined correctly from previous cells if needed
# Assuming BASE and REPO are defined as in cell otaQwrjJSgOQ
BASE = "/content/drive/MyDrive/Notebooks"
REPO = "data-analysis"
PROJ = f"{BASE}/{REPO}"
VENV_PATH = f"{PROJ}/.venv_tgn" # Updated venv path to be a hidden folder inside PROJ

# Cria o ambiente virtual temporal-graph_network inside the project folder
# Use --clear if you want to recreate it every time this cell runs
!python -m venv "{VENV_PATH}"

# Ativa o ambiente virtual
# No Colab, a forma de ativar um ambiente virtual é um pouco diferente
# pois não há um shell interativo tradicional.
# A maneira mais comum é adicionar o diretório binário do ambiente virtual
# ao PATH da sessão atual.

# Adiciona o diretório binário do ambiente virtual ao PATH
# Isso permite que você execute executáveis (como pip, python)
# do ambiente virtual recém-criado.
# Use os.pathsep to be platform-independent
os.environ['PATH'] = f"{VENV_PATH}/bin{os.pathsep}{os.environ['PATH']}"

print("Erro de upgrade do pip é normal no Google Colab. \033[1mPode prosseguir.\033[0m")
print(f"Ambiente virtual '{VENV_PATH}' criado e ativado no PATH.")
!which python

# Mensagem isolada com humor (Skynet)
from IPython.display import display, HTML
display(HTML('<div style="margin:12px 0;padding:8px 12px;border:1px dashed #999;">'
             '<b>🤖 Skynet</b>: T-800 ativado. Diagnóstico do ambiente concluído. 🎯 Alvo principal: organização do notebook.'
             '</div>'))

### **Etapa 2:** Instalar as dependências de bibliotecas Python compatíveis com a versão mais moderna disponível.
---
Para uso no JupytherHub (versão atual python 3.7.9) é necessário realizar updgrade do Python do usuário e/ou adaptar as bibliotecas.

---
Versões fixadas:
- numpy==2.0.2
- pandas==2.3.3

In [None]:
# @title
import sys, subprocess
from importlib import import_module

def pip_command(command, packages, force=False, extra_args=None):
    cmd = [sys.executable, "-m", "pip", command]
    if force:
        cmd.append("--yes") # Use --yes for uninstall to avoid prompts
    if extra_args:
        cmd += list(extra_args)
    cmd += list(packages)
    print("Executando:", " ".join(cmd))
    subprocess.check_call(cmd)

def show_versions(mods):
    print("\n=== Versões carregadas ===")
    for mod in mods:
        try:
            m = import_module(mod)
            v = getattr(m, "__version__", "n/a")
            print(f"{mod}: {v}")
        except ImportError:
            print(f"{mod}: Não instalado")
    print("==========================\n")

CORE_MODS = ("numpy", "pandas", "python-dateutil", "unidecode", "reportlab")

# Update pip
pip_command("install", ["pip"], extra_args=["--upgrade"])

# Force uninstall specific libraries
pip_command("uninstall", ["numpy", "pandas"], force=True)

# Install specified versions
PKGS_TO_INSTALL = [
    "numpy==2.0.2",
    "pandas==2.3.3",
    "python-dateutil",
    "unidecode",
    "reportlab[rl_accel]",
]
pip_command("install", PKGS_TO_INSTALL) # Added --use-pep517 here

# Show installed versions
show_versions(CORE_MODS)

# Mensagem isolada com humor (Skynet)
from IPython.display import display, HTML
display(HTML('<div style="margin:12px 0;padding:8px 12px;border:1px dashed #999;">'
             '<b>🤖 Skynet</b>: Atualizando bibliotecas. Se encontrarmos um pacote rebelde, '
             'aplicaremos persuasão… com pip. 😎</div>'))

###**Etapa 3:** Configura a pasta onde devem ser inseridos os dados de input e output do modelo, caso elas ainda não existam.

In [None]:
# @title
import os
from pathlib import Path

# Ajuste se quiser outra raiz
BASE_DIR = Path(".")
INPUT_DIR = BASE_DIR / "input"
OUTPUT_DIR = BASE_DIR / "output"

for d in [INPUT_DIR, OUTPUT_DIR]:
    d.mkdir(parents=True, exist_ok=True)

print(f"Diretórios prontos:\n - {INPUT_DIR}\n - {OUTPUT_DIR}")

# Mensagem adicional isolada (Skynet)
from IPython.display import display, HTML
display(HTML('<div style="margin:12px 0;padding:8px 12px;border:1px dashed #999;">'
             '<b>🤖 Skynet</b>: Novos modelos neurais para T-800 construídos. Armazéns de CSVs alinhados. '
             'Layout aprovado pela Cyberdyne Systems. 🗂️</div>'))

###**Etapa 4:** Importações das bibliotecas Python e configurações gerais para execução do código

In [None]:
# @title
#imports base que serão usados nas etapas seguintes
import pandas as pd
import numpy as np
from dateutil.parser import parse as dtparse
from unidecode import unidecode

print("Ambiente pronto.")
# Mensagem adicional isolada (Skynet)
from IPython.display import display, HTML
display(HTML('<div style="margin:12px 0;padding:8px 12px;border:1px dashed #999;"><b>🤖 Skynet</b>: T-800, parâmetros centrais em memória.🧠</div>'))

###**Etapa 5:** Importação dos arquivos de input para posterior execução.
---
Implementação atual configurada para Google Colab e permitindo o uso do Google Drive. Para uso em versões futuras é recomendado ajustar para o ambiente de implementação adotado (salvamento em pastas ou apenas upload pelo usuário)

---
Implementação de upload por FileLocal (diretório) apresentando erro no Colab.

Implementar correção **TODO[001]** *prioridade baixa*

####**Sub-etapa específica para uso no Colab:** Montagem do Google Drive (rodar apenas 1x)

In [None]:
# @title
# === SETUP GERAL (rode esta célula 1x) ===
import os, shutil, glob
from google.colab import drive
from IPython.display import display, HTML  # usado pela mensagem Skynet

# Ensure BASE and REPO are defined correctly from previous cells if needed
# Assuming BASE and REPO are defined as in cell otaQwrjJSgOQ
try:
    BASE = "/content/drive/MyDrive/Notebooks"
    REPO = "data-analysis"
    PROJ = f"{BASE}/{REPO}"
except NameError:
    # Fallback if BASE/REPO are not defined, though they should be by now
    PROJ = "/content/data-analysis"


# Se não existir INPUT_DIR definido antes no notebook, cria um padrão:
# Using PROJ to define INPUT_DIR
INPUT_DIR = os.path.join(PROJ, "input")


os.makedirs(INPUT_DIR, exist_ok=True)

TARGET_NAME = "input.csv"
TARGET_PATH = os.path.join(INPUT_DIR, TARGET_NAME)

# Monta o Google Drive (somente se ainda não estiver montado)
if not os.path.ismount("/content/drive"):
    print("Montando Google Drive...")
    drive.mount("/content/drive")
else:
    print("Google Drive já montado.")

def _is_csv_filename(name: str) -> bool:
    return name.lower().endswith(".csv")

def _mensagem_skynet_ok():
    # Mensagem adicional isolada (Skynet)
    display(HTML(
        '<div style="margin:12px 0;padding:8px 12px;border:1px dashed #999;"><b>🤖 Skynet</b>: Munição carregada.🧨'
                 '</div>'
    ))

def _save_bytes_as_input_csv(name: str, data: bytes):
    if not _is_csv_filename(name):
        raise ValueError(f"O arquivo '{name}' não possui extensão .csv.")
    with open(TARGET_PATH, "wb") as f:
        f.write(data)
    print(f"Arquivo '{name}' salvo como '{TARGET_NAME}' em: {TARGET_PATH}")
    _mensagem_skynet_ok()

def _copy_drive_file_to_input_csv(src_path: str):
    if not os.path.exists(src_path):
        raise FileNotFoundError(f"O caminho '{src_path}' não existe.")
    if not _is_csv_filename(src_path):
        raise ValueError(f"O arquivo '{src_path}' não possui extensão .csv.")
    shutil.copyfile(src_path, TARGET_PATH)
    print(f"Arquivo do Drive copiado e salvo como '{TARGET_NAME}' em: {TARGET_PATH}")
    _mensagem_skynet_ok()

####**Sub-etapa:** Opção de upload do input.csv pelo Drive

In [None]:
# @title
def escolher_csv_no_drive(raiz="/content/drive/MyDrive", max_listar=200):
    print(f"Procurando arquivos .csv em: {raiz} (pode levar alguns segundos)...")
    padrao = os.path.join(raiz, "**", "*.csv")
    arquivos = glob.glob(padrao, recursive=True)

    if not arquivos:
        print("Nenhum .csv encontrado nessa pasta.")
        caminho = input("Cole o caminho COMPLETO do .csv no Drive (ou Enter p/ cancelar): ").strip()
        if caminho:
            _copy_drive_file_to_input_csv(caminho)
        else:
            print("Operação cancelada.")
        return

    arquivos = sorted(arquivos)[:max_listar]
    print(f"Encontrados {len(arquivos)} arquivo(s).")
    for i, p in enumerate(arquivos, 1):
        print(f"[{i:03}] {p}")

    escolha = input("\nDigite o número do arquivo desejado (ou cole o caminho absoluto): ").strip()

    if escolha.isdigit():
        idx = int(escolha)
        if 1 <= idx <= len(arquivos):
            _copy_drive_file_to_input_csv(arquivos[idx-1])
        else:
            print("Índice inválido.")
    elif escolha:
        _copy_drive_file_to_input_csv(escolha)
    else:
        print("Operação cancelada.")

# ===== Execução da seleção no Drive =====
raiz = input("Informe a pasta raiz para busca no Drive (Enter = /content/drive/MyDrive): ").strip()
if not raiz:
    raiz = "/content/drive/MyDrive"

try:
    escolher_csv_no_drive(raiz=raiz)
except Exception as e:
    print(f"Erro na seleção via Drive: {e}")

####**Sub-etapa:** Opção de upload do input.csv pelo FileLocal (diretório)
---
Implementação em ERRO no Colab - código desativado

Implementar correção **TODO[001]** *prioridade baixa*

In [None]:
# @title
#from google.colab import files
#
#print("Selecione um arquivo .csv do seu computador para enviar.")
#uploaded = files.upload()  # abre o seletor do Colab
#
#if not uploaded:
#    print("Nenhum arquivo foi carregado.")
#else:
#    # pega o primeiro arquivo enviado
#    name, data = next(iter(uploaded.items()))
#    try:
#        _save_bytes_as_input_csv(name, data)
#    except Exception as e:
#        print(f"Erro no upload local: {e}")
# Mensagem adicional isolada (Skynet)
from IPython.display import display, HTML
display(HTML('<div style="margin:12px 0;padding:8px 12px;border:1px dashed #999;"><b>🤖 Skynet</b>: Detectado ataque da Resistência. Trecho de código inoperante. Salvaguardas ativadas. É possível prosseguir com a missão em segurança.</div>'))

###**Etapa 6:** Análise de cabeçalho - *header*

In [None]:
#@title
#cabeçalho vertical / header (autossuficiente)

from pathlib import Path
import os
import pandas as pd

#normaliza INPUT_DIR para Path, com fallbacks sensatos
try:
    INPUT_DIR  # pode não existir se a célula [1] não tiver sido executada
except NameError:
    INPUT_DIR = None

if INPUT_DIR is None:
    #tenta caminhos mais comuns no seu fluxo
    candidates = [
        "/content/drive/MyDrive/Notebooks/data-analysis/input",
        "/content/data-analysis/input"
    ]
    for c in candidates:
        if os.path.isdir(c):
            INPUT_DIR = c
            break
    if INPUT_DIR is None:
        raise RuntimeError("INPUT_DIR não definido e pastas padrão não existem. Execute a etapa [1]/[2] antes.")

#garante que INPUT_DIR é Path (mesmo que tenha vindo como string)
INPUT_DIR = Path(INPUT_DIR)

SRC = INPUT_DIR / "input.csv"
if not SRC.exists():
    raise FileNotFoundError(f"não encontrei {SRC}. execute a etapa [2] para fazer o upload.")

#lê apenas o cabeçalho (nrows=0), separador ';' e BOM
df_head = pd.read_csv(SRC, sep=';', encoding='utf-8-sig', nrows=0)
cols = list(df_head.columns)

print("Cabeçalho (uma coluna por linha):")
for c in cols:
    print(c)

###**Etapa 7:** Análise superficial da tipologia dos dados
Amostra dos primeiros 5000 registros.

In [None]:
# @title
#inferência de tipos + estatísticas de frequência por coluna (com caso "todos distintos")
import re
import pandas as pd
from dateutil.parser import parse as dtparse
from unidecode import unidecode

SRC = INPUT_DIR / "input.csv"
SAMPLE_ROWS = 5000  #ajuste conforme necessário

#lê amostra como texto puro; usa DataFrame.map (applymap foi deprecado)
df = pd.read_csv(
    SRC, sep=';', encoding='utf-8-sig',
    dtype=str, nrows=SAMPLE_ROWS, keep_default_na=False
).map(lambda x: x.strip())

CNPJ_RX     = re.compile(r"^\d{2}\.?\d{3}\.?\d{3}/\d{4}-\d{2}$")
BOOL_TRUE   = {"true","t","1","y","yes","sim","s","verdadeiro"}
BOOL_FALSE  = {"false","f","0","n","no","nao","não","falso"}
DATE_RX     = re.compile(r"^(\d{2}/\d{2}/\d{4}|\d{4}-\d{2}-\d{2})$")
TIME_RX     = re.compile(r"^\d{2}:\d{2}(:\d{2})?$")

def is_bool(series):
    vals = {unidecode(str(v)).strip().lower() for v in series if str(v).strip() != ""}
    return len(vals) > 0 and all(v in (BOOL_TRUE | BOOL_FALSE) for v in vals)

def is_cnpj(series):
    vals = [str(v).strip() for v in series if str(v).strip() != ""]
    if not vals: return False
    return sum(bool(CNPJ_RX.match(v)) for v in vals) / len(vals) > 0.9

def is_int(series):
    vals = [str(v).strip() for v in series if str(v).strip() != ""]
    if not vals: return False
    def ok(s):
        s2 = s.replace(".", "")
        return re.fullmatch(r"-?\d+", s2) is not None
    return all(ok(v) for v in vals)

def is_float_ptbr(series):
    vals = [str(v).strip() for v in series if str(v).strip() != ""]
    if not vals: return False
    def ok(s):
        s2 = s.replace(".", "").replace(",", ".")
        try: float(s2); return True
        except: return False
    if not all(ok(v) for v in vals): return False
    return any("," in v for v in vals)

def is_float_dot(series):
    vals = [str(v).strip() for v in series if str(v).strip() != ""]
    if not vals: return False
    def ok(s):
        try: float(s); return True
        except: return False
    if not all(ok(v) for v in vals): return False
    return any("." in v and not v.endswith(".") for v in vals)

def is_date_only(series):
    vals = [str(v).strip() for v in series if str(v).strip() != ""]
    if not vals: return False
    sample = vals[:500]
    hits = sum(bool(DATE_RX.match(v)) for v in sample)
    return hits / max(1, len(sample)) > 0.8

def is_time_only(series):
    vals = [str(v).strip() for v in series if str(v).strip() != ""]
    if not vals: return False
    sample = vals[:500]
    hits = sum(bool(TIME_RX.match(v)) for v in sample)
    return hits / max(1, len(sample)) > 0.8

def is_datetime(series):
    vals = [str(v).strip() for v in series if str(v).strip() != ""]
    if not vals: return False
    sample = vals[:200]
    def looks_like_datetime(s):
        has_sep = ("/" in s or "-" in s) and (":" in s)
        if not has_sep: return False
        try:
            pd.to_datetime(s, dayfirst=True, errors="raise")
            return True
        except:
            try:
                dtparse(s, dayfirst=True, fuzzy=False)
                return True
            except:
                return False
    ok = sum(looks_like_datetime(v) for v in sample)
    return ok / max(1, len(sample)) > 0.8

def is_category(series, max_unique=20, max_ratio=0.02):
    n = len(series)
    if n == 0: return False
    uniq = set(v for v in series if str(v).strip() != "")
    ratio = len(uniq) / n
    return (len(uniq) <= max_unique) or (ratio <= max_ratio)

def recommend_dtype(col):
    s = col.astype(str).str.strip()
    s_nonempty = s[s != ""]
    if s_nonempty.empty:
        return "string (vazio/NA)"
    if is_cnpj(s_nonempty):        return "CNPJ (string formatado)"
    if is_bool(s_nonempty):        return "boolean"
    if is_int(s_nonempty):         return "int64"
    if is_float_ptbr(s_nonempty):  return "float64 (decimal=','; milhar='.')"
    if is_float_dot(s_nonempty):   return "float64 (decimal='.')"
    if is_date_only(s_nonempty):   return "date (datetime64[ns])"
    if is_time_only(s_nonempty):   return "time (string/Timedelta)"
    if is_datetime(s_nonempty):    return "datetime (datetime64[ns])"
    if is_category(s):             return "category (string)"
    return "string"

def _fmt_val(x, maxlen=120):
    s = str(x)
    return (s[: maxlen-3] + "...") if len(s) > maxlen else s

print(f"estatísticas baseadas em até {SAMPLE_ROWS} linhas lidas.\n")
for c in df.columns:
    s = df[c].astype(str).str.strip()
    s_nonempty = s[s != ""]
    dtype_sug = recommend_dtype(s)

    if len(s_nonempty) == 0:
        print(f"{c} — {dtype_sug} — distintos=0 — (sem dados não vazios)")
        continue

    vc = s_nonempty.value_counts(dropna=False)
    uniq_count = int(vc.shape[0])

    #caso especial: todos distintos (máxima frequência == 1)
    if int(vc.max()) == 1:
        print(f"{c} — {dtype_sug} — distintos={uniq_count} — todos os dados são distintos")
        continue

    most_val = vc.idxmax()
    most_cnt = int(vc.max())

    min_cnt = int(vc.min())
    least_candidates = vc[vc == min_cnt].sort_index()
    least_val = least_candidates.index[0]
    least_cnt = min_cnt

    print(f"{c} — {dtype_sug} — distintos={uniq_count} — mais_frequente='{_fmt_val(most_val)}' ({most_cnt}) — menos_frequente='{_fmt_val(least_val)}' ({least_cnt})")

###**Etapa 8:** Análise detalhada da tipologia dos dados
---
Aplicada a todos os dados do arquivo, sem filtro.

In [None]:
# @title
#núcleo de análise consolidada (sem geração de relatórios/figuras)
#imports principais
import os, re, math
from pathlib import Path
from collections import Counter, defaultdict
import numpy as np
import pandas as pd
from datetime import datetime

#imports opcionais (anomalias e p-valor para benford)
try:
    from sklearn.ensemble import IsolationForest
    from sklearn.neighbors import LocalOutlierFactor
    SKLEARN_OK = True
except Exception:
    SKLEARN_OK = False

try:
    from scipy.stats import chisquare, median_abs_deviation
    SCIPY_OK = True
except Exception:
    SCIPY_OK = False
    #fallback simples para MAD
    def median_abs_deviation(x, scale=1.4826, nan_policy='omit'):
        x = np.asarray(x, dtype=float)
        x = x[~np.isnan(x)]
        if x.size == 0:
            return np.nan
        med = np.median(x)
        mad = np.median(np.abs(x - med))*scale
        return mad

#normaliza pastas padrão (se etapas anteriores não definiram)
try:
    INPUT_DIR
except NameError:
    INPUT_DIR = Path("/content/drive/MyDrive/Notebooks/data-analysis/input")
INPUT_DIR = Path(INPUT_DIR)
SRC = INPUT_DIR / "input.csv"
if not SRC.exists():
    raise FileNotFoundError(f"não encontrei {SRC}. execute o upload na etapa [2].")

#leitura completa do csv como texto; análise operará com coerções internas
df_raw = pd.read_csv(SRC, sep=";", encoding="utf-8-sig", dtype=str, keep_default_na=False)

#helpers de coerção e detecção
EMAIL_RX = re.compile(r"^[^@\s]+@[^@\s]+\.[^@\s]+$")
URL_RX   = re.compile(r"^https?://", re.I)
CPF_RX   = re.compile(r"^\d{3}\.?\d{3}\.?\d{3}-\d{2}$")
CNPJ_RX  = re.compile(r"^\d{2}\.?\d{3}\.?\d{3}/\d{4}-\d{2}$")

def to_float_ptbr_series(s: pd.Series) -> pd.Series:
    s2 = s.astype(str).str.strip()
    s2 = s2.replace({"": np.nan})
    has_comma = s2.str.contains(",", regex=False, na=False)
    s3 = s2.where(~has_comma, s2.str.replace(".", "", regex=False).str.replace(",", ".", regex=False))
    return pd.to_numeric(s3, errors="coerce")

def detect_numeric(series: pd.Series, thr_ok=0.9):
    num = to_float_ptbr_series(series)
    ratio = 1.0 - num.isna().mean()
    return (ratio >= thr_ok), num

def detect_datetime(series: pd.Series, thr_ok=0.9):
    #formatos comuns pt-br/iso com/sem tempo
    candidate_formats = [
        "%d/%m/%Y", "%d/%m/%Y %H:%M", "%d/%m/%Y %H:%M:%S",
        "%Y-%m-%d", "%Y-%m-%d %H:%M", "%Y-%m-%d %H:%M:%S",
        "%d-%m-%Y", "%d-%m-%Y %H:%M", "%d-%m-%Y %H:%M:%S"
    ]
    s = series.astype(str).str.strip().replace({"": np.nan})
    if not (s.str.contains("/", na=False) | s.str.contains("-", na=False)).any():
        return False, None, None
    best_fmt, best_ratio, best_parsed = None, -1.0, None
    for fmt in candidate_formats:
        parsed = pd.to_datetime(s, errors="coerce", format=fmt)
        ratio = 1.0 - parsed.isna().mean()
        if ratio > best_ratio:
            best_ratio, best_fmt, best_parsed = ratio, fmt, parsed
        if ratio >= thr_ok:
            break
    if best_ratio >= thr_ok:
        return True, best_parsed, best_fmt
    parsed_fb = pd.to_datetime(s, errors="coerce", dayfirst=True)
    ratio_fb = 1.0 - parsed_fb.isna().mean()
    if ratio_fb >= thr_ok:
        return True, parsed_fb, "fallback-dateutil(dayfirst=True)"
    return False, None, None

def semantic_type(series: pd.Series):
    s = series.astype(str).str.strip()
    vals = s[s != ""].head(5000)
    if vals.empty:
        return None
    email_ratio = (vals.str.match(EMAIL_RX)).mean()
    url_ratio   = (vals.str.match(URL_RX)).mean()
    cpf_ratio   = (vals.str.match(CPF_RX)).mean()
    cnpj_ratio  = (vals.str.match(CNPJ_RX)).mean()
    candidates = []
    if email_ratio>0.9: candidates.append("email")
    if url_ratio>0.9: candidates.append("url")
    if cpf_ratio>0.9: candidates.append("cpf")
    if cnpj_ratio>0.9: candidates.append("cnpj")
    return candidates[0] if candidates else None

#mapeamento de tipos
col_types, coerced, dt_formats = {}, {}, {}
for c in df_raw.columns:
    s = df_raw[c]
    #booleano raso
    s_norm = s.astype(str).str.strip().str.lower()
    bool_map = {"true":True,"t":True,"1":True,"y":True,"yes":True,"sim":True,"s":True,"verdadeiro":True,
                "false":False,"f":False,"0":False,"n":False,"no":False,"nao":False,"não":False,"falso":False}
    as_bool = s_norm.map(bool_map).where(s_norm.isin(bool_map.keys()))
    bool_ratio = 1.0 - as_bool.isna().mean()
    is_num, as_num = detect_numeric(s)
    is_dt, as_dt, fmt_dt = detect_datetime(s)
    if bool_ratio >= 0.9:
        col_types[c] = "bool"; coerced[c]=as_bool
    elif is_num:
        frac = np.modf(as_num.dropna().values)[0] if as_num.notna().any() else np.array([])
        if as_num.notna().any() and np.allclose(frac, 0.0):
            col_types[c] = "int"; coerced[c]=as_num.astype("Int64")
        else:
            col_types[c] = "float"; coerced[c]=as_num.astype(float)
    elif is_dt:
        col_types[c] = "datetime"; coerced[c]=as_dt; dt_formats[c]=fmt_dt
    else:
        col_types[c] = "object"; coerced[c]=s.astype(str).str.strip().replace({"": np.nan})

#dataframe tipado (leve)
df = pd.DataFrame(coerced)

#profiling básico
profile_cols = {}
for c in df.columns:
    s = df[c]
    s_raw = df_raw[c]
    n = len(s)
    n_null = int(s.isna().sum())
    #distintos não vazios
    nonnull = s.dropna()
    n_distinct = int(nonnull.nunique())
    #frequências
    most = None; least = None; all_distinct = False
    if nonnull.empty:
        all_distinct = False
    else:
        vc = nonnull.value_counts()
        if vc.max()==1:
            all_distinct = True
        else:
            most = {"value": vc.index[0], "count": int(vc.iloc[0]), "prop": float(vc.iloc[0]/nonnull.shape[0])}
            min_cnt = int(vc.min())
            least_val = vc[vc==min_cnt].sort_index().index[0]
            least = {"value": least_val, "count": min_cnt, "prop": float(min_cnt/nonnull.shape[0])}
    #comprimentos
    lens = s_raw.astype(str).str.len()
    lens = lens.replace({0: np.nan})  #ignora vazios na estatística de len
    len_stats = None
    if lens.notna().any():
        len_stats = {
            "min": int(lens.min()),
            "max": int(lens.max()),
            "mean": float(lens.mean()),
            "q1": float(lens.quantile(0.25)),
            "median": float(lens.median()),
            "q3": float(lens.quantile(0.75))
        }
    #padrões simples por amostragem
    sample_vals = nonnull.astype(str).head(200).tolist()
    regex_examples = []
    rx_date1 = re.compile(r"^\d{2}/\d{2}/\d{4}")
    rx_date2 = re.compile(r"^\d{4}-\d{2}-\d{2}")
    rx_num_pt = re.compile(r"^-?(\d{1,3}(\.\d{3})+|\d+)(,\d+)?$")
    rx_num_dot= re.compile(r"^-?\d+(\.\d+)?$")
    for v in sample_vals[:20]:
        pat = None
        if EMAIL_RX.match(v): pat="email"
        elif URL_RX.match(v): pat="url"
        elif CPF_RX.match(v): pat="cpf"
        elif CNPJ_RX.match(v): pat="cnpj"
        elif rx_date1.match(v): pat="dd/mm/aaaa[...]"
        elif rx_date2.match(v): pat="aaaa-mm-dd[...]"
        elif rx_num_pt.match(v): pat="num-ptbr"
        elif rx_num_dot.match(v): pat="num-dot"
        else: pat="texto-livre"
        regex_examples.append({"example": v[:120], "pattern": pat})
    profile_cols[c]={
        "type": col_types[c],
        "semantic": semantic_type(s_raw),
        "nulls": n_null,
        "distinct_nonnull": n_distinct,
        "all_distinct": all_distinct,
        "most_frequent": most,
        "least_frequent": None if all_distinct else least,
        "length_stats": len_stats,
        "datetime_format": dt_formats.get(c)
    }

#estatísticas descritivas e outliers
eda_numeric = {}
for c in df.columns:
    if col_types[c] in ("int","float"):
        x = df[c].astype(float)
        x = x.dropna()
        if x.empty:
            continue
        q1,q3 = np.nanpercentile(x, [25,75])
        iqr = q3 - q1
        lo,hi = q1-1.5*iqr, q3+1.5*iqr
        out_iqr = int(((x<lo)|(x>hi)).sum())
        mad = float(median_abs_deviation(x, scale=1.4826)) if x.size>0 else np.nan
        z_rob = None
        if not math.isnan(mad) and mad>0:
            z_rob = np.abs((x - np.median(x))/mad)
        out_mad = int((z_rob is not None) and (z_rob>3.5).sum())
        std = float(np.nanstd(x, ddof=1)) if x.size>1 else np.nan
        mean = float(np.nanmean(x)) if x.size>0 else np.nan
        kurt = float(((x-mean)**4).mean()/(std**4)-3.0) if (x.size>2 and std and std>0) else np.nan
        skew = float(((x-mean)**3).mean()/(std**3)) if (x.size>2 and std and std>0) else np.nan
        eda_numeric[c]={
            "n": int(x.size),
            "min": float(np.nanmin(x)),
            "q1": float(q1),
            "median": float(np.nanmedian(x)),
            "q3": float(q3),
            "max": float(np.nanmax(x)),
            "mean": mean,
            "std": std,
            "iqr": float(iqr),
            "outliers_iqr": out_iqr,
            "mad": mad,
            "outliers_mad": out_mad,
            "skew": skew,
            "kurtosis_excess": kurt
        }

#dados categóricos/objeto (entropia e top-k)
def entropy_shannon(series: pd.Series):
    s = series.dropna()
    if s.empty: return 0.0
    vc = s.value_counts(normalize=True)
    return float(-(vc*np.log2(vc)).sum())

eda_categorical = {}
for c in df.columns:
    if col_types[c] in ("object","bool"):
        s = df[c]
        s2 = s.dropna()
        if s2.empty:
            continue
        vc = s2.value_counts()
        uniq = int(vc.shape[0])
        all_dist = int(vc.max()==1)
        topk = [{"value": str(idx)[:120], "count": int(cnt)} for idx,cnt in vc.head(10).items()]
        ent = entropy_shannon(s2)
        eda_categorical[c]={
            "distinct": uniq,
            "all_distinct": bool(all_dist),
            "top10": topk,
            "entropy_shannon": float(ent)
        }

#datas/tempos
eda_datetime = {}
for c in df.columns:
    if col_types[c]=="datetime":
        ds = df[c].dropna()
        if ds.empty:
            continue
        per_day = ds.dt.date.value_counts().sort_index()
        eda_datetime[c]={
            "format": dt_formats.get(c),
            "min": str(ds.min()),
            "max": str(ds.max()),
            "unique_days": int(per_day.shape[0]),
            "mean_per_day": float(per_day.mean())
        }

#faltantes e duplicados
missing = {
    "by_column_pct": {c: float(df[c].isna().mean()*100.0) for c in df.columns},
    "duplicates_rows": int(df.duplicated().sum())
}
#coocorrência simples de ausências (matriz de proporção conjunta)
miss_mat = pd.DataFrame(index=df.columns, columns=df.columns, dtype=float)
isna_df = df.isna()
for i,a in enumerate(df.columns):
    for b in df.columns[i:]:
        both = (isna_df[a] & isna_df[b]).mean()
        miss_mat.loc[a,b] = miss_mat.loc[b,a] = float(both)
missing["cooccurrence_matrix"] = miss_mat

#FDs/CFDs aproximadas (unários) e sugestões de DCs
fds = []     #X->Y exata (cobertura 100%)
cfds = []    #X->Y quase: cobertura >=thr
thr_cfd = 0.98
for a in df.columns:
    ga = df.groupby(a, dropna=False)
    #a é chave candidata?
    if ga.size().max()==1:
        fds.append({"determinant":[a], "key":True})
    #FD aproximada a->b
    for b in df.columns:
        if a==b: continue
        nun = ga[b].nunique(dropna=False)
        cov = float((nun<=1).mean())
        if cov==1.0:
            fds.append({"determinant":[a], "implies": b, "coverage": 1.0})
        elif cov>=thr_cfd:
            cfds.append({"determinant":[a], "implies": b, "coverage": cov})

#denial constraints sugeridas (heurísticas)
#exemplos: não-negatividade para colunas com 'valor', limites plausíveis para idade, datas início<=fim
dcs = []
#não-negatividade
for c in df.columns:
    if col_types[c] in ("int","float") and re.search(r"(valor|amount|price|quant|qty|pag|pago)", c, re.I):
        neg = int((df[c].astype(float)<0).sum())
        dcs.append({"constraint": f"{c}>=0", "violations": neg})
#idade plausível
for c in df.columns:
    if col_types[c] in ("int","float") and re.search(r"(idade|age)", c, re.I):
        v = df[c].astype(float)
        viol = int(((v<0)|(v>120)).sum())
        dcs.append({"constraint": f"0<= {c} <=120", "violations": viol})
#data início<=fim
date_cols = [c for c in df.columns if col_types[c]=="datetime"]
for a in date_cols:
    for b in date_cols:
        if a==b: continue
        if re.search(r"(inicio|start|begin)", a, re.I) and re.search(r"(fim|end|finish)", b, re.I):
            viol = int((df[a].notna() & df[b].notna() & (df[b]<df[a])).sum())
            dcs.append({"constraint": f"{a}<= {b}", "violations": viol})

#correlações
num_cols = [c for c,t in col_types.items() if t in ("int","float")]
corr_pearson = None; corr_spearman = None
if len(num_cols)>=2:
    df_num = df[num_cols].astype(float)
    corr_pearson = df_num.corr(method="pearson")
    corr_spearman = df_num.corr(method="spearman")

#anomalias (opcional)
anomalies = {}
if SKLEARN_OK and len(num_cols)>=1:
    X = df[num_cols].astype(float).fillna(df[num_cols].astype(float).median())
    #isolation forest
    try:
        iso = IsolationForest(n_estimators=200, contamination='auto', random_state=42)
        iso_scores = -iso.fit_predict(X)  #1 normal, -1 anomalia -> invertido
        iso_dec = iso.decision_function(X)  #menor = mais anômalo
        iso_rank = np.argsort(iso_dec)[: min(50, len(iso_dec))].tolist()
        anomalies["isolation_forest"] = {"top_idx": iso_rank, "decision_function": iso_dec.tolist()}
    except Exception as e:
        anomalies["isolation_forest_error"] = str(e)
    #lof
    try:
        lof = LocalOutlierFactor(n_neighbors=min(20, max(2, X.shape[0]-1)), contamination='auto')
        lof_pred = lof.fit_predict(X)  #-1 outlier
        lof_score = -lof.negative_outlier_factor_  #maior = mais anômalo
        lof_rank = np.argsort(-lof_score)[: min(50, len(lof_score))].tolist()
        anomalies["lof"] = {"top_idx": lof_rank, "score": lof_score.tolist()}
    except Exception as e:
        anomalies["lof_error"] = str(e)
else:
    anomalies["note"] = "sklearn indisponível ou sem colunas numéricas suficientes"

#lei de benford (primeiro dígito) para colunas monetárias prováveis
def first_digit_series(x: pd.Series):
    x = x.astype(float)
    x = x[~x.isna() & (x!=0)]
    x = x.abs()
    s = x.astype(str).str.replace(".", "", regex=False).str.lstrip("0")
    s = s[s.str.len()>0].str[0]
    s = s[s.str.isnumeric()].astype(int)
    return s

benford = {}
monetary_cols = [c for c in num_cols if re.search(r"(valor|amount|price|pago|pagamento|receita|despesa)", c, re.I)]
for c in monetary_cols:
    try:
        d1 = first_digit_series(df[c])
        if d1.empty:
            continue
        obs_counts = d1.value_counts().reindex(range(1,10), fill_value=0).astype(int)
        obs_probs = obs_counts/obs_counts.sum()
        exp_probs = np.array([math.log10(1+1/d) for d in range(1,10)])
        exp_probs = exp_probs/exp_probs.sum()
        chi2_stat = float(((obs_probs-exp_probs)**2/exp_probs).sum()*obs_counts.sum())
        p_value = None
        if SCIPY_OK:
            #qui-quadrado com gl=8
            p_value = float(1.0 - chisquare(f_obs=obs_counts, f_exp=exp_probs*obs_counts.sum()).cdf)
        benford[c]={
            "observed_counts": obs_counts.to_dict(),
            "observed_probs": {int(k): float(v) for k,v in obs_probs.items()},
            "expected_probs": {d: float(p) for d,p in zip(range(1,9+1), exp_probs)},
            "chi2_stat": chi2_stat,
            "p_value": p_value
        }
    except Exception as e:
        benford[c]={"error": str(e)}

#empacotar tudo em um único artefato de análise
ANALYSIS = {
    "stamp": datetime.now().strftime("%d%m%y-%H%M"),
    "shape": {"rows": int(df_raw.shape[0]), "cols": int(df_raw.shape[1])},
    "types": col_types,
    "datetime_formats": dt_formats,
    "profile": profile_cols,
    "eda": {
        "numeric": eda_numeric,
        "categorical": eda_categorical,
        "datetime": eda_datetime
    },
    "missingness": {
        "by_column_pct": missing["by_column_pct"],
        "duplicates_rows": missing["duplicates_rows"],
        "cooccurrence_matrix": missing["cooccurrence_matrix"]
    },
    "fds": fds,
    "cfds": cfds,
    "denial_constraints_suggested": dcs,
    "correlations": {
        "pearson": corr_pearson,
        "spearman": corr_spearman
    },
    "anomalies": anomalies,
    "benford": benford,
    "notes": {
        "sklearn_available": SKLEARN_OK,
        "scipy_available": SCIPY_OK
    }
}

print("análise concluída e armazenada em ANALYSIS.")
print("- você pode inspecionar, por exemplo: ANALYSIS['profile']['<nome_da_coluna>']")
print("- pronto para a etapa de geração de relatórios quando desejar.")

###**Etapa 9:** Geração de relatórios

In [None]:
# @title
#geração de relatórios: TXT, HTML (imagens embutidas), PNGs (imagens/) e PDF completo
#imports
import os, io, base64
from pathlib import Path
from datetime import datetime
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

#libs para pdf (tabelas completas)
try:
    from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Image as RLImage, Table, TableStyle, PageBreak
    from reportlab.lib.pagesizes import A4
    from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
    from reportlab.lib import colors
    REPORTLAB_OK = True
except Exception:
    REPORTLAB_OK = False

#checagens e caminhos
try:
    ANALYSIS
except NameError:
    raise RuntimeError("ANALYSIS não encontrado. execute a etapa [5] antes.")

try:
    INPUT_DIR
except NameError:
    INPUT_DIR = Path("/content/drive/MyDrive/Notebooks/data-analysis/input")
try:
    OUTPUT_DIR
except NameError:
    OUTPUT_DIR = Path("/content/drive/MyDrive/Notebooks/data-analysis/output")

INPUT_DIR  = Path(INPUT_DIR)
OUTPUT_DIR = Path(OUTPUT_DIR)
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)

#stamp e diretórios de saída
stamp = ANALYSIS.get("stamp", datetime.now().strftime("%d%m%y-%H%M"))
RUN_DIR = OUTPUT_DIR / stamp
IMG_DIR = RUN_DIR / "imagens"
RUN_DIR.mkdir(parents=True, exist_ok=True)
IMG_DIR.mkdir(parents=True, exist_ok=True)

#arquivo fonte
SRC = INPUT_DIR / "input.csv"
if not SRC.exists():
    raise FileNotFoundError(f"não encontrei {SRC}. execute a etapa [2] e [5].")

#utils
def save_fig(path: Path):
    path.parent.mkdir(parents=True, exist_ok=True)
    plt.tight_layout()
    plt.savefig(path, dpi=120, bbox_inches="tight")
    plt.close()

def img_to_data_uri(path: Path) -> str:
    with open(path, "rb") as f:
        b64 = base64.b64encode(f.read()).decode("ascii")
    return "data:image/png;base64,{}".format(b64)

def to_float_ptbr_series(s: pd.Series) -> pd.Series:
    s2 = s.astype(str).str.strip().replace({"": np.nan})
    has_comma = s2.str.contains(",", regex=False, na=False)
    s3 = s2.where(~has_comma, s2.str.replace(".", "", regex=False).str.replace(",", ".", regex=False))
    return pd.to_numeric(s3, errors="coerce")

#carregar df base para gráficos
df_raw = pd.read_csv(SRC, sep=";", encoding="utf-8-sig", dtype=str, keep_default_na=False)

#tipos e colunas numéricas conforme ANALYSIS
col_types = ANALYSIS["types"]
num_cols = [c for c,t in col_types.items() if t in ("int","float")]

#gerar imagens principais
#ausências
miss_pct = pd.Series(ANALYSIS["missingness"]["by_column_pct"]).sort_values(ascending=False)
plt.figure(figsize=(max(6, 0.4*len(miss_pct)+2), 4.5))
plt.bar(miss_pct.index, miss_pct.values)
plt.xticks(rotation=90)
plt.ylabel("% ausente")
plt.title("ausência de valores por coluna")
save_fig(IMG_DIR / "missing_bar.png")

#correlação (pearson) se houver ≥2 numéricas
if len(num_cols) >= 2 and ANALYSIS["correlations"]["pearson"] is not None:
    corr = pd.DataFrame(ANALYSIS["correlations"]["pearson"])
    plt.figure(figsize=(max(6, 0.6*len(corr)+2), max(5, 0.6*len(corr)+2)))
    im = plt.imshow(corr.values, interpolation="nearest")
    plt.xticks(ticks=range(len(corr.columns)), labels=corr.columns, rotation=90)
    plt.yticks(ticks=range(len(corr.index)), labels=corr.index)
    plt.title("matriz de correlação (pearson)")
    plt.colorbar(im, fraction=0.046, pad=0.04)
    save_fig(IMG_DIR / "corr_heatmap.png")

#histogramas e boxplots por coluna numérica
for c in num_cols:
    x = to_float_ptbr_series(df_raw[c]).dropna()
    if x.empty:
        continue
    plt.figure(figsize=(6,4))
    plt.hist(x, bins=30)
    plt.title("histograma - {}".format(c))
    plt.xlabel(c); plt.ylabel("frequência")
    save_fig(IMG_DIR / "hist_{}.png".format(c))

    plt.figure(figsize=(4,4))
    plt.boxplot(x.values, vert=True, whis=1.5, showfliers=True)
    plt.title("boxplot - {}".format(c))
    plt.ylabel(c)
    save_fig(IMG_DIR / "box_{}.png".format(c))

#1) relatório TXT (agrupado por coluna)
txt_lines = []
shape = ANALYSIS["shape"]
txt_lines.append("arquivo: {}".format(SRC.name))
txt_lines.append("linhas (inclui cabeçalho): {}".format(shape["rows"]))
txt_lines.append("colunas: {}".format(shape["cols"]))
txt_lines.append("registros duplicados: {}".format(ANALYSIS["missingness"]["duplicates_rows"]))
txt_lines.append("")

for c in df_raw.columns:
    prof = ANALYSIS["profile"].get(c, {})
    kind = col_types.get(c)
    txt_lines.append("[coluna] {}".format(c))
    txt_lines.append("- tipo: {}".format(kind))
    if prof.get("semantic"):
        txt_lines.append("- tipo semântico: {}".format(prof["semantic"]))
    txt_lines.append("- nulos: {}".format(prof.get("nulls", 0)))
    txt_lines.append("- distintos (não vazios): {}".format(prof.get("distinct_nonnull", 0)))
    if prof.get("all_distinct"):
        txt_lines.append("- todos os dados são distintos")
    else:
        mf = prof.get("most_frequent"); lf = prof.get("least_frequent")
        if mf:
            txt_lines.append("- mais frequente: '{}' ({}, {:.2f}%)".format(mf["value"], mf["count"], mf["prop"]*100))
        if lf:
            txt_lines.append("- menos frequente: '{}' ({}, {:.2f}%)".format(lf["value"], lf["count"], lf["prop"]*100))
    if prof.get("length_stats"):
        ls = prof["length_stats"]
        txt_lines.append("- comprimentos: min={}, q1={:.1f}, mediana={:.1f}, q3={:.1f}, max={}".format(ls["min"], ls["q1"], ls["median"], ls["q3"], ls["max"]))
    if kind in ("int","float"):
        ed = ANALYSIS["eda"]["numeric"].get(c)
        if ed:
            txt_lines.append("- numéricos: min={}, q1={}, mediana={}, q3={}, max={}".format(ed["min"], ed["q1"], ed["median"], ed["q3"], ed["max"]))
            txt_lines.append("- média={}, desvio padrão={}, iqr={}".format(ed["mean"], ed["std"], ed["iqr"]))
            txt_lines.append("- outliers(IQR)={}, outliers(MAD)={}, skew={}, curtose(excesso)={}".format(ed["outliers_iqr"], ed["outliers_mad"], ed["skew"], ed["kurtosis_excess"]))
    elif kind == "datetime":
        dtc = ANALYSIS["eda"]["datetime"].get(c)
        if dtc:
            txt_lines.append("- formato detectado: {}".format(dtc.get("format")))
            txt_lines.append("- intervalo temporal: {} → {}".format(dtc["min"], dtc["max"]))
            txt_lines.append("- dias únicos: {}, média por dia: {:.2f}".format(dtc["unique_days"], dtc["mean_per_day"]))
    else:
        cat = ANALYSIS["eda"]["categorical"].get(c)
        if cat:
            txt_lines.append("- entropia de shannon: {:.4f}".format(cat["entropy_shannon"]))
            if not cat["all_distinct"]:
                topk = ", ".join(["'{}' ({})".format(t.get("value"), t.get("count")) for t in cat["top10"]])
                txt_lines.append("- top10: {}".format(topk))
    ben = ANALYSIS.get("benford", {}).get(c)
    if ben:
        txt_lines.append("- benford (primeiro dígito):")
        txt_lines.append("  • qui-quadrado={:.4f}, p-valor={}".format(ben.get("chi2_stat"), ben.get("p_value")))
    txt_lines.append("")

#salvar TXT
txt_path = RUN_DIR / "relatorio.txt"
with open(txt_path, "w", encoding="utf-8") as f:
    f.write("\n".join(txt_lines))

#2) relatório HTML com imagens embutidas
html = []
html.append("<html><head><meta charset='utf-8'><title>Relatório de Análise</title>")
html.append("<style>body{font-family:Arial,Helvetica,sans-serif;margin:20px}h1,h2,h3{margin:8px 0}table{border-collapse:collapse;margin:10px 0}th,td{border:1px solid #ccc;padding:6px 8px;font-size:13px}code{background:#f5f5f5;padding:0 4px}</style>")
html.append("</head><body>")
html.append("<h1>Relatório de Análise — {}</h1>".format(stamp))
html.append("<p>Arquivo analisado: <b>{}</b></p>".format(SRC.name))
html.append("<p><a href='relatorio.txt'>Baixar relatório TXT</a></p>")

#sumário
html.append("<h2>Sumário</h2>")
html.append("<ul>")
html.append("<li>Linhas (inclui cabeçalho): {}</li>".format(shape["rows"]))
html.append("<li>Colunas: {}</li>".format(shape["cols"]))
html.append("<li>Registros duplicados: {}</li>".format(ANALYSIS["missingness"]["duplicates_rows"]))
html.append("</ul>")

#ausências
miss_img = IMG_DIR / "missing_bar.png"
if miss_img.exists():
    html.append("<h2>Ausência de valores</h2>")
    html.append("<img src='{}' alt='missing bar'/>".format(img_to_data_uri(miss_img)))

#correlação
corr_img = IMG_DIR / "corr_heatmap.png"
if corr_img.exists():
    html.append("<h2>Matriz de correlação</h2>")
    html.append("<img src='{}' alt='corr heatmap'/>".format(img_to_data_uri(corr_img)))

#por coluna
html.append("<h2>Perfil por coluna</h2>")
for c in df_raw.columns:
    prof = ANALYSIS["profile"].get(c, {})
    kind = col_types.get(c)
    html.append("<h3>{}</h3>".format(c))
    html.append("<table>")
    html.append("<tr><th>Tipo</th><td>{}</td></tr>".format(kind))
    html.append("<tr><th>Nulos</th><td>{}</td></tr>".format(prof.get("nulls",0)))
    html.append("<tr><th>Distintos (não vazios)</th><td>{}</td></tr>".format(prof.get("distinct_nonnull",0)))
    if prof.get("semantic"):
        html.append("<tr><th>Tipo semântico</th><td>{}</td></tr>".format(prof["semantic"]))
    if prof.get("all_distinct"):
        html.append("<tr><th>Frequências</th><td>todos os dados são distintos</td></tr>")
    else:
        mf = prof.get("most_frequent"); lf = prof.get("least_frequent")
        freq_txt = []
        if mf:
            freq_txt.append("mais frequente: <code>{}</code> ({}, {:.2f}%)".format(str(mf["value"])[:120], mf["count"], mf["prop"]*100))
        if lf:
            freq_txt.append("menos frequente: <code>{}</code> ({}, {:.2f}%)".format(str(lf["value"])[:120], lf["count"], lf["prop"]*100))
        if freq_txt:
            html.append("<tr><th>Frequências</th><td>{}</td></tr>".format(" | ".join(freq_txt)))
    if prof.get("length_stats"):
        ls = prof["length_stats"]
        html.append("<tr><th>Comprimentos</th><td>min={}, q1={:.1f}, mediana={:.1f}, q3={:.1f}, max={}</td></tr>".format(ls["min"], ls["q1"], ls["median"], ls["q3"], ls["max"]))

    if kind in ("int","float"):
        ed = ANALYSIS["eda"]["numeric"].get(c)
        if ed:
            html.append("<tr><th>Estatísticas</th><td>min={}, q1={}, mediana={}, q3={}, max={}"
                        "<br/>média={}, desvio padrão={}, iqr={}"
                        "<br/>outliers(IQR)={}, outliers(MAD)={}, skew={}, curtose(excesso)={}</td></tr>".format(
                            ed["min"], ed["q1"], ed["median"], ed["q3"], ed["max"],
                            ed["mean"], ed["std"], ed["iqr"],
                            ed["outliers_iqr"], ed["outliers_mad"], ed["skew"], ed["kurtosis_excess"]
                        ))
        hist_p = IMG_DIR / "hist_{}.png".format(c)
        box_p  = IMG_DIR / "box_{}.png".format(c)
        figs = []
        if hist_p.exists(): figs.append("<img src='{}' alt='hist {}'/>".format(img_to_data_uri(hist_p), c))
        if box_p.exists():  figs.append("<img src='{}' alt='box {}'/>".format(img_to_data_uri(box_p), c))
        if figs:
            html.append("<tr><th>Gráficos</th><td>{}</td></tr>".format("<br/>".join(figs)))

    elif kind == "datetime":
        dtc = ANALYSIS["eda"]["datetime"].get(c)
        if dtc:
            html.append("<tr><th>Data/hora</th><td>formato detectado: {}<br/>intervalo: {} → {}<br/>dias únicos: {}, média por dia: {:.2f}</td></tr>".format(
                dtc.get("format"), dtc["min"], dtc["max"], dtc["unique_days"], dtc["mean_per_day"]
            ))

    else:
        cat = ANALYSIS["eda"]["categorical"].get(c)
        if cat:
            html.append("<tr><th>Entropia</th><td>{:.4f}</td></tr>".format(cat["entropy_shannon"]))
            if not cat["all_distinct"]:
                rows = "".join(["<tr><td>{}</td><td style='text-align:right'>{}</td></tr>".format(str(t["value"])[:120], t["count"]) for t in cat["top10"]])
                html.append("<tr><th>Top 10</th><td><table><tr><th>Valor</th><th>Contagem</th></tr>"+rows+"</table></td></tr>")

    ben = ANALYSIS.get("benford", {}).get(c)
    if ben:
        html.append("<tr><th>Benford</th><td>qui-quadrado={:.4f}, p-valor={}</td></tr>".format(ben.get("chi2_stat"), ben.get("p_value")))

    html.append("</table>")

#fds/cfds/dcs
html.append("<h2>Regras sugeridas</h2>")
if ANALYSIS["fds"]:
    html.append("<h3>FDs</h3><ul>")
    for r in ANALYSIS["fds"]:
        if r.get("key"):
            html.append("<li>chave candidata: {}</li>".format(", ".join(r["determinant"])))
        else:
            html.append("<li>{} → {} (100%)</li>".format(", ".join(r["determinant"]), r["implies"]))
    html.append("</ul>")
if ANALYSIS["cfds"]:
    html.append("<h3>CFDs (aproximadas)</h3><ul>")
    for r in ANALYSIS["cfds"][:200]:
        html.append("<li>{} → {} ({:.2f}%)</li>".format(", ".join(r["determinant"]), r["implies"], r["coverage"]*100))
    html.append("</ul>")
if ANALYSIS["denial_constraints_suggested"]:
    html.append("<h3>Denial constraints</h3><ul>")
    for d in ANALYSIS["denial_constraints_suggested"]:
        html.append("<li>{} — violações: {}</li>".format(d["constraint"], d["violations"]))
    html.append("</ul>")

html.append("</body></html>")

#salvar HTML
html_path = RUN_DIR / "relatorio.html"
with open(html_path, "w", encoding="utf-8") as f:
    f.write("\n".join(html))

#4) pdf com as MESMAS infos (tabelas por coluna + imagens + regras)
pdf_path = RUN_DIR / "relatorio.pdf"
if REPORTLAB_OK:
    styles = getSampleStyleSheet()
    styles.add(ParagraphStyle(name="Small", parent=styles["Normal"], fontSize=9, leading=11))
    table_style = TableStyle([
        ("GRID", (0,0), (-1,-1), 0.5, colors.grey),
        ("BACKGROUND", (0,0), (-1,0), colors.HexColor("#f0f0f0")),
        ("VALIGN", (0,0), (-1,-1), "TOP"),
        ("LEFTPADDING", (0,0), (-1,-1), 6),
        ("RIGHTPADDING", (0,0), (-1,-1), 6),
        ("TOPPADDING", (0,0), (-1,-1), 4),
        ("BOTTOMPADDING", (0,0), (-1,-1), 4),
    ])

    def table_kv(rows):
        #rows: list of (key, value(str))
        data = [["Campo","Valor"]] + rows
        t = Table(data, colWidths=[120, 360])
        t.setStyle(table_style)
        return t

    story = []
    story.append(Paragraph("Relatório de Análise — {}".format(stamp), styles["Title"]))
    story.append(Spacer(1, 12))
    story.append(Paragraph("Arquivo analisado: <b>{}</b>".format(SRC.name), styles["Normal"]))
    story.append(Paragraph("Linhas: {} &nbsp;&nbsp; Colunas: {}".format(shape["rows"], shape["cols"]), styles["Normal"]))
    story.append(Paragraph("Registros duplicados: {}".format(ANALYSIS["missingness"]["duplicates_rows"]), styles["Normal"]))
    story.append(Spacer(1, 10))

    #ausências
    miss_img = IMG_DIR / "missing_bar.png"
    if miss_img.exists():
        story.append(Paragraph("Ausência de valores por coluna", styles["Heading2"]))
        story.append(RLImage(str(miss_img), width=480, height=320))
        story.append(Spacer(1, 8))

    #correlação
    corr_img = IMG_DIR / "corr_heatmap.png"
    if corr_img.exists():
        story.append(Paragraph("Matriz de correlação (Pearson)", styles["Heading2"]))
        story.append(RLImage(str(corr_img), width=480, height=320))
        story.append(Spacer(1, 8))

    #por coluna: tabela completa com as MESMAS infos do HTML/TXT
    for c in df_raw.columns:
        prof = ANALYSIS["profile"].get(c, {})
        kind = col_types.get(c)
        story.append(Paragraph("Coluna: {}".format(c), styles["Heading3"]))

        rows = []
        rows.append(["Tipo", str(kind)])
        rows.append(["Nulos", str(prof.get("nulls",0))])
        rows.append(["Distintos (não vazios)", str(prof.get("distinct_nonnull",0))])
        if prof.get("semantic"):
            rows.append(["Tipo semântico", str(prof["semantic"])])

        if prof.get("all_distinct"):
            rows.append(["Frequências", "todos os dados são distintos"])
        else:
            mf = prof.get("most_frequent"); lf = prof.get("least_frequent")
            freq_parts = []
            if mf:
                freq_parts.append("mais frequente: '{}' ({}, {:.2f}%)".format(mf["value"], mf["count"], mf["prop"]*100))
            if lf:
                freq_parts.append("menos frequente: '{}' ({}, {:.2f}%)".format(lf["value"], lf["count"], lf["prop"]*100))
            if freq_parts:
                rows.append(["Frequências", " | ".join(freq_parts)])

        if prof.get("length_stats"):
            ls = prof["length_stats"]
            rows.append(["Comprimentos", "min={}, q1={:.1f}, mediana={:.1f}, q3={:.1f}, max={}".format(
                ls["min"], ls["q1"], ls["median"], ls["q3"], ls["max"]
            )])

        #estatísticas por tipo
        if kind in ("int","float"):
            ed = ANALYSIS["eda"]["numeric"].get(c)
            if ed:
                rows.append(["Estatísticas", "min={}, q1={}, mediana={}, q3={}, max={}\nmédia={}, desvio padrão={}, iqr={}\noutliers(IQR)={}, outliers(MAD)={}, skew={}, curtose(excesso)={}".format(
                    ed["min"], ed["q1"], ed["median"], ed["q3"], ed["max"],
                    ed["mean"], ed["std"], ed["iqr"],
                    ed["outliers_iqr"], ed["outliers_mad"], ed["skew"], ed["kurtosis_excess"]
                )])
            hist_p = IMG_DIR / "hist_{}.png".format(c)
            box_p  = IMG_DIR / "box_{}.png".format(c)
            if hist_p.exists():
                rows.append(["Histograma", "ver imagem abaixo"])
            if box_p.exists():
                rows.append(["Boxplot", "ver imagem abaixo"])

        elif kind == "datetime":
            dtc = ANALYSIS["eda"]["datetime"].get(c)
            if dtc:
                rows.append(["Data/hora", "formato detectado: {}\nintervalo: {} → {}\ndias únicos: {}, média por dia: {:.2f}".format(
                    dtc.get("format"), dtc["min"], dtc["max"], dtc["unique_days"], dtc["mean_per_day"]
                )])

        else:
            cat = ANALYSIS["eda"]["categorical"].get(c)
            if cat:
                rows.append(["Entropia", "{:.4f}".format(cat["entropy_shannon"])])
                if not cat["all_distinct"]:
                    #tabela interna de top10
                    top_rows = [["Valor","Contagem"]] + [[str(t.get("value"))[:120], str(t.get("count"))] for t in cat["top10"]]
                    ttop = Table(top_rows, colWidths=[360, 120])
                    ttop.setStyle(TableStyle([
                        ("GRID", (0,0), (-1,-1), 0.5, colors.grey),
                        ("BACKGROUND", (0,0), (-1,0), colors.HexColor("#f7f7f7")),
                        ("VALIGN", (0,0), (-1,-1), "TOP"),
                    ]))
                    #primeiro empurramos um placeholder e depois inserimos a tabela como bloco
                    rows.append(["Top 10", "tabela abaixo"])
                    story.append(table_kv(rows))
                    story.append(Spacer(1, 4))
                    story.append(ttop)
                    rows = []  #limpa para não duplicar em table_kv abaixo

        #benford
        ben = ANALYSIS.get("benford", {}).get(c)
        if ben:
            rows.append(["Benford", "qui-quadrado={:.4f}, p-valor={}".format(ben.get("chi2_stat"), ben.get("p_value"))])

        if rows:
            story.append(table_kv(rows))
            story.append(Spacer(1, 6))

        #imagens específicas da coluna
        if kind in ("int","float"):
            hist_p = IMG_DIR / "hist_{}.png".format(c)
            box_p  = IMG_DIR / "box_{}.png".format(c)
            if hist_p.exists():
                story.append(RLImage(str(hist_p), width=480, height=320))
                story.append(Spacer(1, 4))
            if box_p.exists():
                story.append(RLImage(str(box_p), width=320, height=320))
                story.append(Spacer(1, 6))

        story.append(Spacer(1, 6))

    #regras sugeridas (FDs/CFDs/DCs) como tabelas/listas
    story.append(PageBreak())
    story.append(Paragraph("Regras sugeridas", styles["Heading2"]))

    if ANALYSIS["fds"]:
        rows = [["Determinante", "Implicado/Chave", "Cobertura"]]
        for r in ANALYSIS["fds"]:
            if r.get("key"):
                rows.append([", ".join(r["determinant"]), "chave candidata", "100%"])
            else:
                rows.append([", ".join(r["determinant"]), r["implies"], "100%"])
        tfds = Table(rows, colWidths=[220, 180, 80])
        tfds.setStyle(table_style)
        story.append(Paragraph("FDs", styles["Heading3"]))
        story.append(tfds)
        story.append(Spacer(1, 8))

    if ANALYSIS["cfds"]:
        rows = [["Determinante", "Implicado", "Cobertura"]]
        for r in ANALYSIS["cfds"][:500]:
            rows.append([", ".join(r["determinant"]), r["implies"], "{:.2f}%".format(r["coverage"]*100)])
        tcfds = Table(rows, colWidths=[220, 180, 80])
        tcfds.setStyle(table_style)
        story.append(Paragraph("CFDs (aproximadas)", styles["Heading3"]))
        story.append(tcfds)
        story.append(Spacer(1, 8))

    if ANALYSIS["denial_constraints_suggested"]:
        rows = [["Regra (DC)", "Violações"]]
        for d in ANALYSIS["denial_constraints_suggested"]:
            rows.append([d["constraint"], str(d["violations"])])
        tdcs = Table(rows, colWidths=[360, 120])
        tdcs.setStyle(table_style)
        story.append(Paragraph("Denial constraints", styles["Heading3"]))
        story.append(tdcs)

    doc = SimpleDocTemplate(str(pdf_path), pagesize=A4, leftMargin=24, rightMargin=24, topMargin=24, bottomMargin=24)
    doc.build(story)
else:
    print("reportlab não disponível; pulei a geração do PDF. instale reportlab e reexecute a etapa [6].")

print("relatórios gerados em: {}".format(RUN_DIR))
print("- TXT: relatorio.txt")
print("- HTML: relatorio.html (imagens embutidas)")
print("- PNGs: subpasta imagens/")
print("- PDF: {}".format("relatorio.pdf" if REPORTLAB_OK else "(não gerado — instale reportlab)"))