# Multiagente Extractor (PDF/Imagem)
Este módulo cria:
- Tools de extração (PDF texto, PDF OCR, Imagem OCR)
- Heurística para detectar PDF "scanned" (sem texto real)
- 3 agentes:
  - RouterAgent: decide o caminho (extensão + scanned)
  - TextPDF_Agent: tenta extrair texto do PDF
  - OCR_Agent: faz OCR (PDF scanned ou imagem)
- 1 função `run_extraction(path)` para usar como pipeline


In [1]:
# Célula 1 - Inicialização
import os, json, time
from pathlib import Path
from typing import Dict, Any, List

DADOS_DIR = Path(r"C:\Users\fepac\Unicamp_Project\dados")
assert DADOS_DIR.exists(), f"Pasta não existe: {DADOS_DIR}"
print("OK:", DADOS_DIR)

OUT_DIR = DADOS_DIR / "out_extract"
OUT_DIR.mkdir(parents=True, exist_ok=True)

TRACE_JSONL = OUT_DIR / "trace_log.jsonl"
TRACE_MD    = OUT_DIR / "trace_pretty.md"

# limpa logs
for p in [TRACE_JSONL, TRACE_MD]:
    if p.exists():
        p.unlink()

def trace_event(event: Dict[str, Any]) -> None:
    event["ts"] = time.strftime("%Y-%m-%d %H:%M:%S")
    TRACE_JSONL.open("a", encoding="utf-8").write(json.dumps(event, ensure_ascii=False) + "\n")

def trace_md(line: str) -> None:
    TRACE_MD.open("a", encoding="utf-8").write(line.rstrip() + "\n")

print("OK: OUT_DIR =", OUT_DIR)
print("OK: TRACE_JSONL =", TRACE_JSONL)
print("OK: TRACE_MD =", TRACE_MD)


OK: C:\Users\fepac\Unicamp_Project\dados
OK: OUT_DIR = C:\Users\fepac\Unicamp_Project\dados\out_extract
OK: TRACE_JSONL = C:\Users\fepac\Unicamp_Project\dados\out_extract\trace_log.jsonl
OK: TRACE_MD = C:\Users\fepac\Unicamp_Project\dados\out_extract\trace_pretty.md


In [2]:
# Célula 2 - ToolResponse + Toolkit
from agentscope.tool import ToolResponse, Toolkit

try:
    toolkit
except NameError:
    toolkit = Toolkit()

def _resp_text(text: str, **meta) -> ToolResponse:
    return ToolResponse(
        content=[{"type": "text", "text": text}],
        metadata=meta or None,
        stream=False,
        is_last=True,
        is_interrupted=False,
        id=None,
    )

print("OK: toolkit pronto")


OK: toolkit pronto


In [3]:
# Célula 3 - Model (CORRIGIDA: base_url vai em client_kwargs)
import os
from dotenv import load_dotenv
from agentscope.model import OpenAIChatModel

ENV_PATH = r"C:\Users\fepac\Unicamp_Project\.env"
load_dotenv(ENV_PATH, override=True)

GROQ_API_KEY  = os.getenv("GROQ_API_KEY")
GROQ_BASE_URL = os.getenv("GROQ_BASE_URL", "https://api.groq.com/openai/v1")
GROQ_MODEL    = os.getenv("GROQ_MODEL", "llama-3.3-70b-versatile")

assert GROQ_API_KEY, "GROQ_API_KEY não carregou do .env"
assert GROQ_BASE_URL, "GROQ_BASE_URL não carregou do .env"

model = OpenAIChatModel(
    model_name=GROQ_MODEL,
    api_key=GROQ_API_KEY,
    client_kwargs={"base_url": GROQ_BASE_URL},  # <<< FIX DEFINITIVO
    stream=True,
)

print("OK: model criado")
print("GROQ_BASE_URL:", GROQ_BASE_URL)
print("GROQ_MODEL:", GROQ_MODEL)
print("GROQ_API_KEY prefix:", GROQ_API_KEY[:4], "len:", len(GROQ_API_KEY))


OK: model criado
GROQ_BASE_URL: https://api.groq.com/openai/v1
GROQ_MODEL: llama-3.3-70b-versatile
GROQ_API_KEY prefix: gsk_ len: 56


In [4]:
# Célula 4 - Heurística PDF scanned
import pdfplumber
from pypdf import PdfReader

def _pdf_text_stats(pdf_path: str, max_pages: int = 3) -> Dict[str, Any]:
    p = Path(pdf_path)
    if not p.exists():
        return {"ok": False, "error": "Arquivo não existe"}

    text_chars = 0
    pages_checked = 0
    with pdfplumber.open(str(p)) as pdf:
        for page in pdf.pages[:max_pages]:
            pages_checked += 1
            t = page.extract_text() or ""
            text_chars += len(t.strip())

    img_pages = 0
    warn = None
    try:
        reader = PdfReader(str(p))
        for i in range(min(max_pages, len(reader.pages))):
            page = reader.pages[i]
            resources = page.get("/Resources") or {}
            xobj = resources.get("/XObject") if hasattr(resources, "get") else None
            has_img = False
            if xobj:
                xobj = xobj.get_object()
                for _, obj in xobj.items():
                    o = obj.get_object()
                    if o.get("/Subtype") == "/Image":
                        has_img = True
                        break
            if has_img:
                img_pages += 1
    except Exception as e:
        warn = f"Falha pypdf ao inspecionar imagens: {e}"

    likely_scanned = (text_chars < 50 and img_pages > 0) if warn is None else (text_chars < 50)

    return {
        "ok": True,
        "pages_checked": pages_checked,
        "text_chars": text_chars,
        "img_pages": img_pages if warn is None else None,
        "likely_scanned": likely_scanned,
        "warn": warn,
    }

print("OK: _pdf_text_stats pronto")


OK: _pdf_text_stats pronto


In [5]:
# Célula 5 - Tools PDF
@toolkit.register_tool_function
def pdf_is_scanned(pdf_path: str) -> ToolResponse:
    trace_event({"type": "tool_start", "tool": "pdf_is_scanned", "pdf_path": pdf_path})
    t0 = time.time()
    stats = _pdf_text_stats(pdf_path, max_pages=3)
    trace_event({"type": "tool_end", "tool": "pdf_is_scanned", "elapsed_s": round(time.time()-t0, 3), "stats": stats})
    trace_md(f"- tool pdf_is_scanned | {Path(pdf_path).name} | {stats}")
    return _resp_text(str(stats), stats=stats)

@toolkit.register_tool_function
def extract_pdf_text(pdf_path: str) -> ToolResponse:
    trace_event({"type": "tool_start", "tool": "extract_pdf_text", "pdf_path": pdf_path})
    t0 = time.time()

    p = Path(pdf_path)
    if not p.exists():
        trace_event({"type": "tool_end", "tool": "extract_pdf_text", "success": False, "error": "Arquivo não existe"})
        return _resp_text("Arquivo não existe.", success=False)

    texts = []
    with pdfplumber.open(str(p)) as pdf:
        for page in pdf.pages:
            t = page.extract_text() or ""
            if t.strip():
                texts.append(t)

    out = "\n\n".join(texts).strip()
    success = bool(out)

    trace_event({"type": "tool_end", "tool": "extract_pdf_text", "elapsed_s": round(time.time()-t0, 3), "success": success, "chars": len(out)})
    trace_md(f"- tool extract_pdf_text | {p.name} | success={success} chars={len(out)}")

    if not out:
        return _resp_text("Nenhum texto extraído via pdfplumber.", success=False)
    return _resp_text(out, success=True)

print("OK: tools PDF registradas")


OK: tools PDF registradas


In [6]:
# Célula 6 - Tools OCR
from PIL import Image
import urllib.request
import pytesseract
from pdf2image import convert_from_path

TESSERACT_CMD = r"C:\Program Files\Tesseract-OCR\tesseract.exe"  # ou None
TESSDATA_DIR  = Path(r"C:\Program Files\Tesseract-OCR\tessdata")
DEFAULT_LANG  = "por"

if TESSERACT_CMD and Path(TESSERACT_CMD).exists():
    pytesseract.pytesseract.tesseract_cmd = TESSERACT_CMD

def _ensure_tess_lang(lang: str, tessdata_dir: Path = TESSDATA_DIR) -> str:
    tessdata_dir.mkdir(parents=True, exist_ok=True)
    trained = tessdata_dir / f"{lang}.traineddata"
    if trained.exists():
        return lang
    if lang == "por":
        url = "https://github.com/UB-Mannheim/tesseract/raw/main/tessdata/por.traineddata"
        try:
            trace_event({"type": "tess_lang_download_start", "lang": "por", "dest": str(trained)})
            urllib.request.urlretrieve(url, str(trained))
            trace_event({"type": "tess_lang_download_end", "lang": "por", "dest": str(trained), "success": True})
            return "por"
        except Exception as e:
            trace_event({"type": "tess_lang_download_end", "lang": "por", "dest": str(trained), "success": False, "error": str(e)})
            return "eng"
    return "eng"

@toolkit.register_tool_function
def ocr_image(image_path: str, lang: str = DEFAULT_LANG, psm: int = 6, oem: int = 3) -> ToolResponse:
    trace_event({"type": "tool_start", "tool": "ocr_image", "image_path": image_path, "lang": lang})
    t0 = time.time()

    p = Path(image_path)
    if not p.exists():
        trace_event({"type": "tool_end", "tool": "ocr_image", "success": False, "error": "Arquivo não existe"})
        return _resp_text("Arquivo não existe.", success=False)

    lang_ok = _ensure_tess_lang(lang)
    try:
        img = Image.open(str(p)).convert("RGB")
        cfg = f"--oem {oem} --psm {psm}"
        text = pytesseract.image_to_string(img, lang=lang_ok, config=cfg).strip()
        success = bool(text)

        trace_event({"type": "tool_end", "tool": "ocr_image", "elapsed_s": round(time.time()-t0, 3), "success": success, "lang_used": lang_ok, "chars": len(text)})
        trace_md(f"- tool ocr_image | {p.name} | success={success} lang={lang_ok} chars={len(text)}")

        if not text:
            return _resp_text("OCR não retornou texto.", success=False, lang_used=lang_ok)
        return _resp_text(text, success=True, lang_used=lang_ok)
    except Exception as e:
        trace_event({"type": "tool_end", "tool": "ocr_image", "success": False, "error": str(e)})
        return _resp_text(f"Erro no OCR da imagem: {e}", success=False, lang_used=lang_ok)

@toolkit.register_tool_function
def ocr_pdf(pdf_path: str, lang: str = DEFAULT_LANG, first_n_pages: int = 5, dpi: int = 250, psm: int = 6, oem: int = 3) -> ToolResponse:
    trace_event({"type": "tool_start", "tool": "ocr_pdf", "pdf_path": pdf_path, "lang": lang, "first_n_pages": first_n_pages, "dpi": dpi})
    t0 = time.time()

    p = Path(pdf_path)
    if not p.exists():
        trace_event({"type": "tool_end", "tool": "ocr_pdf", "success": False, "error": "Arquivo não existe"})
        return _resp_text("Arquivo não existe.", success=False)

    lang_ok = _ensure_tess_lang(lang)

    try:
        pages = convert_from_path(str(p), dpi=dpi, first_page=1, last_page=first_n_pages)
    except Exception as e:
        trace_event({"type": "tool_end", "tool": "ocr_pdf", "success": False, "error": str(e)})
        return _resp_text(
            "Falha ao converter PDF em imagens (pdf2image). Instale Poppler e coloque no PATH. "
            f"Detalhe: {e}",
            success=False,
        )

    cfg = f"--oem {oem} --psm {psm}"
    chunks = []
    try:
        for i, pil_img in enumerate(pages, start=1):
            txt = pytesseract.image_to_string(pil_img.convert('RGB'), lang=lang_ok, config=cfg).strip()
            if txt:
                chunks.append(f"--- page {i} ---\n{txt}")

        out = "\n\n".join(chunks).strip()
        success = bool(out)

        trace_event({"type": "tool_end", "tool": "ocr_pdf", "elapsed_s": round(time.time()-t0, 3), "success": success, "lang_used": lang_ok, "pages": len(pages), "chars": len(out)})
        trace_md(f"- tool ocr_pdf | {p.name} | success={success} lang={lang_ok} pages={len(pages)} chars={len(out)}")

        if not out:
            return _resp_text("OCR no PDF não retornou texto.", success=False, lang_used=lang_ok)
        return _resp_text(out, success=True, lang_used=lang_ok)

    except Exception as e:
        trace_event({"type": "tool_end", "tool": "ocr_pdf", "success": False, "error": str(e)})
        return _resp_text(f"Erro no OCR do PDF: {e}", success=False, lang_used=lang_ok)

print("OK: tools OCR registradas")


OK: tools OCR registradas


In [7]:
# Célula 7 - RouterAgent (MODIFICADA: troca OpenAIMultiAgentFormatter -> OpenAIChatFormatter)
from agentscope.agent import ReActAgent
from agentscope.memory import InMemoryMemory
from agentscope.formatter import OpenAIChatFormatter  # <<< AQUI
from agentscope.tool import ToolResponse

formatter = OpenAIChatFormatter()  # <<< AQUI

@toolkit.register_tool_function
def trace_decision(file_path: str, decision: str, extra: str = "") -> ToolResponse:
    trace_event({"type": "agent_decision", "agent": "RouterAgent", "file": file_path, "decision": decision, "extra": extra})
    trace_md(f"## decision | {Path(file_path).name}\n- decision: {decision}\n- extra: {extra}\n")
    return _resp_text("decision logged", success=True)

RouterAgent = ReActAgent(
    name="RouterAgent",
    sys_prompt=(
        "Você é um roteador de extração.\n\n"
        "Você DEVE seguir este fluxo e registrar decisão chamando a tool trace_decision:\n"
        "1) Descubra a extensão do arquivo (use o file_path do usuário).\n"
        "2) Se for imagem: chame trace_decision(file_path,'image->ocr_image') e então chame ocr_image.\n"
        "3) Se for PDF: chame pdf_is_scanned.\n"
        "   - Se likely_scanned=True: chame trace_decision(file_path,'pdf_scanned->ocr_pdf') e então ocr_pdf.\n"
        "   - Caso contrário: tente extract_pdf_text.\n"
        "       - Se extract_pdf_text retornar success=False: chame trace_decision(file_path,'pdf_text_fail->ocr_pdf') e então ocr_pdf.\n"
        "       - Se success=True: chame trace_decision(file_path,'pdf_text->extract_pdf_text').\n\n"
        "Saída: devolva APENAS o texto extraído (sem explicações longas)."
    ),
    model=model,
    formatter=formatter,
    memory=InMemoryMemory(),
    toolkit=toolkit,
    max_iters=20,
)

print("OK: RouterAgent recriado com OpenAIChatFormatter")


OK: RouterAgent recriado com OpenAIChatFormatter


In [8]:
# Célula 8 - Batch runner 
from pathlib import Path
from agentscope.message import Msg   # <<< FALTAVA ISSO

SUPPORTED_IMAGES = {".png", ".jpg", ".jpeg", ".webp", ".bmp", ".tif", ".tiff"}
SUPPORTED_PDFS   = {".pdf"}
SUPPORTED_EXTS   = SUPPORTED_IMAGES | SUPPORTED_PDFS

def list_supported_files(data_dir: Path) -> list[Path]:
    return sorted([p for p in data_dir.rglob("*") if p.is_file() and p.suffix.lower() in SUPPORTED_EXTS])

def build_extract_msg(file_path: Path, lang: str = "por") -> Msg:
    return Msg(
        name="user",
        role="user",
        content=(
            "Faça a extração do conteúdo do arquivo abaixo.\n"
            "Regras: se for PDF, verifique se é scanned; use OCR quando necessário.\n"
            f"Arquivo: {str(file_path)}\n"
            f"Lang: {lang}\n"
            "Retorne o texto extraído (sem inventar conteúdo)."
        ),
    )

async def extract_one_with_router(file_path: Path, lang: str = "por") -> str:
    msg = build_extract_msg(file_path, lang=lang)
    reply = await RouterAgent.reply(msg)
    return reply.get_text_content()

async def run_folder_extraction(
    data_dir: Path,
    out_dir: Path,
    lang: str = "por",
    limit: int = 0,
) -> Path:
    out_dir.mkdir(parents=True, exist_ok=True)

    files = list_supported_files(data_dir)
    if limit and limit > 0:
        files = files[:limit]

    print(f"Arquivos suportados encontrados: {len(files)}")
    for p in files:
        print(" -", p.name)

    log_rows = ["file,status,output_or_error"]
    for p in files:
        print("\n=== EXTRAINDO:", p.name, "===")
        try:
            text = await extract_one_with_router(p, lang=lang)
            out_txt = out_dir / f"{p.stem}.txt"
            out_txt.write_text(text, encoding="utf-8", errors="ignore")
            log_rows.append(f"{p.name},OK,{out_txt}")
            print("OK ->", out_txt)
        except Exception as e:
            log_rows.append(f"{p.name},ERR,{str(e).replace(',', ';')}")
            print("ERRO:", e)

    log_path = out_dir / "batch_log.csv"
    log_path.write_text("\n".join(log_rows), encoding="utf-8", errors="ignore")
    print("\nLOG salvo em:", log_path)
    return log_path


In [9]:
# Célula 9 - Executar (Jupyter)
log_path = await run_folder_extraction(DADOS_DIR, OUT_DIR, lang="por", limit=0)
print("Finalizado. Log:", log_path)
print("Trace JSONL:", TRACE_JSONL)
print("Trace MD:", TRACE_MD)


Arquivos suportados encontrados: 2
 - nota2.png
 - relatorio_financeiro.pdf

=== EXTRAINDO: nota2.png ===
ERRO: Failed to call a function. Please adjust your prompt. See 'failed_generation' for more details.

=== EXTRAINDO: relatorio_financeiro.pdf ===
RouterAgent: {
    "type": "tool_use",
    "id": "0j4tyqm4t",
    "name": "trace_decision",
    "input": {
        "decision": "pdf_scanned->ocr_pdf",
        "extra": "",
        "file_path": "C:\\\\Users\\\\fepac\\\\Unicamp_Project\\\\dados\\\\relatorio_financeiro.pdf"
    }
}
RouterAgent: {
    "type": "tool_use",
    "id": "jb6xh5hw0",
    "name": "pdf_is_scanned",
    "input": {
        "pdf_path": "C:\\\\Users\\\\fepac\\\\Unicamp_Project\\\\dados\\\\relatorio_financeiro.pdf"
    }
}
RouterAgent: {
    "type": "tool_use",
    "id": "sszq0v6rw",
    "name": "ocr_pdf",
    "input": {
        "dpi": 250,
        "first_n_pages": 5,
        "lang": "por",
        "oem": 3,
        "pdf_path": "C:\\\\Users\\\\fepac\\\\Unicamp_Project\\\\

In [10]:
# Célula 10 - Sumário do trace (por arquivo)
import json
from collections import defaultdict

def summarize_trace(path: Path):
    events = []
    for ln in path.read_text(encoding="utf-8", errors="ignore").splitlines():
        try:
            events.append(json.loads(ln))
        except Exception:
            pass

    per_file = defaultdict(lambda: {"decisions": [], "tools": [], "agent_elapsed": None, "chars": None, "errors": []})

    for e in events:
        f = e.get("file") or e.get("pdf_path") or e.get("image_path")
        if not f:
            continue
        f = str(f)

        if e.get("type") == "agent_decision":
            per_file[f]["decisions"].append(e.get("decision"))

        if e.get("type") == "tool_end":
            per_file[f]["tools"].append({
                "tool": e.get("tool"),
                "success": e.get("success", None),
                "elapsed_s": e.get("elapsed_s", None),
                "chars": e.get("chars", None),
            })

        if e.get("type") == "agent_call_end":
            per_file[f]["agent_elapsed"] = e.get("elapsed_s")
            per_file[f]["chars"] = e.get("chars")

        if e.get("type") in ["file_error"]:
            per_file[f]["errors"].append(e.get("error"))

    print("=== SUMMARY ===")
    for f, info in per_file.items():
        name = Path(f).name
        print(f"\n{name}")
        print("  decisions:", info["decisions"])
        print("  tools:", info["tools"])
        print("  agent_elapsed:", info["agent_elapsed"], "chars:", info["chars"])
        if info["errors"]:
            print("  errors:", info["errors"])

summarize_trace(TRACE_JSONL)


=== SUMMARY ===

relatorio_financeiro.pdf
  decisions: ['pdf_scanned->ocr_pdf', 'pdf_text->extract_pdf_text']
  tools: []
  agent_elapsed: None chars: None
