# Sistema Jur√≠dico RAG Avan√ßado (Notebook Demo)

Este notebook apresenta uma implementa√ß√£o de refer√™ncia baseada na arquitetura de 4 camadas descrita na documenta√ß√£o t√©cnica. Ele foca em:
- Extra√ß√£o estruturada (Docling)
- Indexa√ß√£o baseada em racioc√≠nio (PageIndex)
- Gest√£o de contexto conversacional (ChatIndex)
- Persist√™ncia audit√°vel (Google Drive)

Inclui tamb√©m um chat jur√≠dico robusto com rastreabilidade e logs de auditoria.


## Depend√™ncias e configura√ß√£o

Este notebook foi estruturado para rodar localmente ou em ambientes como Colab. Ajuste os caminhos e chaves conforme necess√°rio.

In [None]:
# Instala√ß√£o de depend√™ncias avan√ßadas (execute apenas uma vez)
%pip install -q langchain langchain-openai langchain-groq langgraph chromadb pypdf sentence-transformers faiss-cpu
%pip install -q "mcp[cli]" mcp[openai] nest-asyncio python-magic pdfplumber
%pip install -q beautifulsoup4 requests playwright lxml html2text
%pip install -q pymupdf pytesseract pillow
!apt-get update && apt-get install -y poppler-utils tesseract-ocr-por > /dev/null
!playwright install --with-deps chromium
print("‚úÖ Pacotes instalados com sucesso!")


In [None]:
from __future__ import annotations

from dataclasses import dataclass
from datetime import datetime
import os
from pathlib import Path
from typing import Any, Dict, List, Optional
import asyncio
import requests
from docling.document_converter import DocumentConverter
from playwright.async_api import async_playwright
import hashlib
import json
from langchain_groq import ChatGroq
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

# Configura√ß√£o base (ajuste conforme seu ambiente)
@dataclass(frozen=True)
class ConfigSistema:
    pageindex_api_url: str = "https://api.pageindex.ai/v1/index"
    pageindex_api_key: str = "SUA_CHAVE_AQUI"
    chatindex_dir: Path = Path("./chatindex")
    drive_root: Path = Path("./Juridico_Unificado")
    audit_dir: Path = Path("./Juridico_Unificado/05_Auditoria")
    cache_dir: Path = Path("./cache_juridico")
    mcp_servers_dir: Path = Path("./Juridico_Unificado/04_Integracoes/mcp_servers")
    versao_fonte: str = "v1"
    groq_api_key: str = os.environ.get("GROQ_API_KEY", "")
    groq_model_analise: str = "llama-3.1-8b-instant"
    groq_model_resposta: str = "llama-3.1-70b-versatile"

config = ConfigSistema()
config

In [None]:
# Configura√ß√£o de API Keys (preencha antes de usar LLMs)
os.environ.setdefault("GROQ_API_KEY", "SUA_CHAVE_AQUI")


## üéØ Como usar o sistema

1. **Processar documentos:**
   ```python
   resultado = sistema.processar_documento(Path("seu_documento.pdf"))
   ```
2. **Consultar o sistema:**
   ```python
   resposta = sistema.responder_consulta("sua pergunta jur√≠dica", {"fontes": []})
   ```
3. **Gerar relat√≥rios:**
   ```python
   relatorio = sistema.auditoria.registrar_evento("relatorios", {"tipo": "snapshot"})
   ```
4. **Acessar auditoria:**
   ```python
   sistema.auditoria.registrar_evento("consultas", {"tipo": "consulta", "consulta": "exemplo"})
   ```

### üîß Configura√ß√µes importantes
- **API Keys:** configure `GROQ_API_KEY` para respostas com LLM.
- **Fontes:** scraping inclui STF, Planalto e STJ.
- **Armazenamento:** tudo √© salvo no Google Drive de forma estruturada.
- **Chunking:** preserva estrutura jur√≠dica e evita quebras inadequadas.

### üöÄ Pr√≥ximos passos
1. Configurar API do PageIndex para indexa√ß√£o em √°rvore real.
2. Implementar servidores MCP locais para melhor integra√ß√£o.
3. Adicionar mais fontes oficiais de scraping.
4. Implementar cache distribu√≠do para melhor performance.
5. Adicionar dashboard de monitoramento.

### üìÅ Estrutura no Google Drive
```text
Juridico_Unificado/
‚îú‚îÄ‚îÄ 01_PageIndex/          # √Årvores de documentos
‚îú‚îÄ‚îÄ 02_ChatIndex/         # Hist√≥rico de conversas
‚îú‚îÄ‚îÄ 03_Docling_Output/    # Extra√ß√µes estruturadas
‚îÇ   ‚îú‚îÄ‚îÄ raw_extractions/  # Extra√ß√µes brutas
‚îÇ   ‚îú‚îÄ‚îÄ structured_outputs/ # Extra√ß√µes processadas
‚îÇ   ‚îî‚îÄ‚îÄ chunks_semanticos/ # Chunks preservados
‚îú‚îÄ‚îÄ 04_Integracoes/       # Dados cruzados
‚îú‚îÄ‚îÄ 05_Auditoria/         # Logs e rastreabilidade
‚îÇ   ‚îú‚îÄ‚îÄ log_central.jsonl # Log principal
‚îÇ   ‚îú‚îÄ‚îÄ relatorios/       # Relat√≥rios gerados
‚îÇ   ‚îî‚îÄ‚îÄ consultas/        # Hist√≥rico de consultas
‚îî‚îÄ‚îÄ indice_documentos.json # √çndice global
```

### ‚ö†Ô∏è Notas importantes
- O scraping de fontes reais requer ajustes para sites espec√≠ficos.
- A integra√ß√£o completa com PageIndex MCP requer servidor em execu√ß√£o.
- Configure `GROQ_API_KEY` para respostas mais inteligentes.
- Documentos muito grandes podem exigir ajustes no chunking.

‚úÖ O sistema est√° pronto para processar documentos jur√≠dicos com:
- Extra√ß√£o estrutural com Docling
- Chunking sem√¢ntico que preserva contexto
- Scraping de fontes oficiais
- Armazenamento rastre√°vel no Google Drive
- Auditoria completa de todas as opera√ß√µes


In [None]:
# Exemplo: baixar PDF para processamento
def baixar_pdf(url: str, destino: Path) -> Path:
    destino.parent.mkdir(parents=True, exist_ok=True)
    resposta = requests.get(url, timeout=30)
    resposta.raise_for_status()
    destino.write_bytes(resposta.content)
    return destino

pdf_url = "https://arxiv.org/pdf/2103.15348.pdf"
pdf_path = baixar_pdf(pdf_url, Path("./cache_juridico/exemplo.pdf"))
print(f"PDF salvo em: {pdf_path}")


## Camada 1: Orquestra√ß√£o

A classe `SistemaJuridicoUnificado` atua como facade, coordenando extra√ß√£o, chunking, indexa√ß√£o, scraping e auditoria.

### Integra√ß√µes reais (Docling, PageIndex, scraping)

Os componentes abaixo conectam o fluxo a extra√ß√£o real via Docling, indexa√ß√£o na API do PageIndex e scraping com Playwright, incluindo rate limiting.


In [None]:
class SistemaExtracaoDocling:
    def __init__(self) -> None:
        self.converter = DocumentConverter()

    def extrair(self, documento_path: Path) -> Dict[str, Any]:
        resultado = self.converter.convert(str(documento_path))
        documento = resultado.document
        texto = documento.export_to_text()
        return {
            "documento_id": documento_path.stem,
            "texto_completo": texto,
            "metadata": getattr(resultado, "metadata", {}),
        }


class PageIndexClient:
    def __init__(self, api_url: str, api_key: str) -> None:
        self.api_url = api_url
        self.api_key = api_key

    def indexar(self, documento_id: str, chunks: List[str]) -> Dict[str, Any]:
        resposta = requests.post(
            self.api_url,
            json={"documento_id": documento_id, "chunks": chunks},
            headers={"Authorization": f"Bearer {self.api_key}"},
            timeout=30,
        )
        resposta.raise_for_status()
        return resposta.json()


class SistemaDownload:
    def __init__(self, versao_fonte: str) -> None:
        self.versao_fonte = versao_fonte

    def registrar_fonte(self, origem: str) -> Dict[str, Any]:
        return {
            "origem": origem,
            "versao_fonte": self.versao_fonte,
            "data_captura": datetime.now().isoformat(),
        }


class SistemaScrapingJuridico:
    def __init__(self, downloader: SistemaDownload, rate_limit_seconds: float = 1.0) -> None:
        self.rate_limit_seconds = rate_limit_seconds
        self.downloader = downloader

    async def coletar_texto(self, url: str) -> Dict[str, Any]:
        async with async_playwright() as playwright:
            browser = await playwright.chromium.launch()
            page = await browser.new_page()
            await page.goto(url, wait_until="networkidle")
            await asyncio.sleep(self.rate_limit_seconds)
            texto = await page.inner_text("body")
            await browser.close()
            return {"texto": texto, "metadata": self.downloader.registrar_fonte(url)}


class ChatIndexPersistente:
    def __init__(self, storage_path: Path) -> None:
        self.storage_path = storage_path
        self.storage_path.parent.mkdir(parents=True, exist_ok=True)

    def registrar(self, conversa: Dict[str, Any]) -> None:
        with self.storage_path.open("a", encoding="utf-8") as handle:
            handle.write(json.dumps(conversa, ensure_ascii=False) + "\n")

    def carregar(self) -> List[Dict[str, Any]]:
        if not self.storage_path.exists():
            return []
        return [
            json.loads(linha)
            for linha in self.storage_path.read_text(encoding="utf-8").splitlines()
            if linha.strip()
        ]


In [None]:
# Exemplo: scraping simples de uma p√°gina
sistema = SistemaJuridicoUnificado(config)
texto_coletado = asyncio.run(sistema.scraper.coletar_texto("https://www.gov.br/"))
print(texto_coletado["metadata"])


In [None]:
class SistemaAuditoriaUnificado:
    def __init__(self, audit_dir: Path):
        self.audit_dir = audit_dir
        self.audit_dir.mkdir(parents=True, exist_ok=True)
        self.log_central: List[Dict[str, Any]] = []
        self.hash_registry: Dict[str, Dict[str, str]] = {}

    def registrar_evento(self, categoria: str, evento: Dict[str, Any]) -> str:
        evento_id = f"evt_{hashlib.md5(str(evento).encode()).hexdigest()[:10]}"
        evento["timestamp"] = evento.get("timestamp", datetime.now().isoformat())

        if self.log_central:
            evento["hash_anterior"] = self.hash_registry[self.log_central[-1]["evento_id"]]["hash"]

        hash_atual = hashlib.md5(json.dumps(evento, sort_keys=True).encode()).hexdigest()
        self.hash_registry[evento_id] = {"hash": hash_atual, "timestamp": evento["timestamp"]}

        registro = {**evento, "evento_id": evento_id, "hash_atual": hash_atual}
        self.log_central.append(registro)
        self._persistir_log(categoria, registro)
        return evento_id

    def _persistir_log(self, categoria: str, registro: Dict[str, Any]) -> None:
        destino = self.audit_dir / f"{categoria}.jsonl"
        with destino.open("a", encoding="utf-8") as handle:
            handle.write(json.dumps(registro, ensure_ascii=False) + "\n")


class SistemaJuridicoUnificado:
    def __init__(self, config: ConfigSistema):
        self.config = config
        self.auditoria = SistemaAuditoriaUnificado(config.audit_dir)
        self.extrator = SistemaExtracaoDocling()
        self.pageindex = PageIndexClient(config.pageindex_api_url, config.pageindex_api_key)
        self.downloader = SistemaDownload(config.versao_fonte)
        self.scraper = SistemaScrapingJuridico(self.downloader)
        self.chatindex = ChatIndexPersistente(config.chatindex_dir / "historico.jsonl")

    def _chunking_semantico(self, texto: str, tamanho: int = 800) -> List[str]:
        return [texto[i:i + tamanho] for i in range(0, len(texto), tamanho)]

    def _extrair_versoes_fontes(self, metadados_fontes: List[Dict[str, Any]]) -> List[str]:
    def _gerar_resposta_llm(self, consulta: str, fontes: List[str], metadados_fontes: List[Dict[str, Any]]) -> str:
        if not self.config.groq_api_key or self.config.groq_api_key == "SUA_CHAVE_AQUI":
            return ""
        analise_prompt = ChatPromptTemplate.from_messages([
            ("system", "Voc√™ √© um analista jur√≠dico. Extraia pontos-chave e contexto necess√°rio."),
            ("human", "Consulta: {consulta}\nFontes: {fontes}")
        ])
        resposta_prompt = ChatPromptTemplate.from_messages([
            ("system", "Voc√™ √© um assistente jur√≠dico. Responda com base nos pontos-chave e fontes."),
            ("human", "Consulta: {consulta}\nPontos-chave: {pontos}\nMetadados: {metadados}")
        ])
        llm_analise = ChatGroq(
            groq_api_key=self.config.groq_api_key,
            model=self.config.groq_model_analise,
            temperature=0.2,
        )
        llm_resposta = ChatGroq(
            groq_api_key=self.config.groq_api_key,
            model=self.config.groq_model_resposta,
            temperature=0.3,
        )
        pontos = (analise_prompt | llm_analise | StrOutputParser()).invoke({
            "consulta": consulta,
            "fontes": ", ".join(fontes),
        })
        return (resposta_prompt | llm_resposta | StrOutputParser()).invoke({
            "consulta": consulta,
            "pontos": pontos,
            "metadados": metadados_fontes,
        })

        return sorted({
            item.get("versao_fonte")
            for item in metadados_fontes
            if item.get("versao_fonte")
        })

    def processar_documento(self, documento_path: Path) -> Dict[str, Any]:
        self.auditoria.registrar_evento("processamento", {
            "tipo": "inicio_processamento",
            "documento": documento_path.name,
        })
        extracao = self.extrator.extrair(documento_path)
        chunks = self._chunking_semantico(extracao["texto_completo"])
        indexacao = self.pageindex.indexar(extracao["documento_id"], chunks)
        arvore = {"documento_id": documento_path.stem, "estrutura_arvore": {"raiz": {"titulo": documento_path.stem}}}

        self.auditoria.registrar_evento("processamento", {
            "tipo": "fim_processamento",
            "documento": documento_path.name,
            "chunks_gerados": len(chunks),
        })
        return {"extracao": extracao, "chunks": chunks, "arvore": arvore, "indexacao": indexacao}

    def responder_consulta(self, consulta: str, contexto: Dict[str, Any]) -> Dict[str, Any]:
        metadados_fontes = contexto.get("metadados_fontes", [])
        versoes_fontes = self._extrair_versoes_fontes(metadados_fontes)
        self.auditoria.registrar_evento("consultas", {
            "tipo": "consulta",
            "consulta": consulta,
            "versao_fonte": versoes_fontes,
        })
        resposta_texto = self._gerar_resposta_llm(consulta, contexto.get("fontes", []), metadados_fontes)
        resposta = {
            "consulta": consulta,
            "resposta": resposta_texto or f"Resumo jur√≠dico para: {consulta}",
            "fontes": contexto.get("fontes", []),
            "metadados_fontes": metadados_fontes,
            "versao_fonte": versoes_fontes,
        }
        self.auditoria.registrar_evento("consultas", {
            "tipo": "resposta",
            "consulta": consulta,
            "hash_resposta": hashlib.md5(json.dumps(resposta, ensure_ascii=False).encode()).hexdigest(),
            "versao_fonte": versoes_fontes,
        })
        return resposta


## Camada 2: Busca h√≠brida e ranking

A classe `SistemaBuscaHibrida` combina sinais do PageIndex e de embeddings vetoriais, com pesos configur√°veis e explicabilidade do ranking.


In [None]:
class SistemaBuscaHibrida:
    def __init__(self, peso_pageindex: float = 0.6, peso_vetorial: float = 0.4):
        self.peso_pageindex = peso_pageindex
        self.peso_vetorial = peso_vetorial

    def _normalizar(self, resultados: list[dict[str, float]]) -> dict[str, float]:
        if not resultados:
            return {}
        max_score = max(item.get("score", 0.0) for item in resultados) or 1.0
        return {item["id"]: item.get("score", 0.0) / max_score for item in resultados}

    def rankear(self, resultados_pageindex: list[dict[str, float]], resultados_vetoriais: list[dict[str, float]]):
        pageindex_norm = self._normalizar(resultados_pageindex)
        vetorial_norm = self._normalizar(resultados_vetoriais)
        ids = set(pageindex_norm) | set(vetorial_norm)
        combinados = []

        for doc_id in ids:
            score_pageindex = pageindex_norm.get(doc_id, 0.0)
            score_vetorial = vetorial_norm.get(doc_id, 0.0)
            score_final = (
                self.peso_pageindex * score_pageindex +
                self.peso_vetorial * score_vetorial
            )
            combinados.append({
                "id": doc_id,
                "score_final": score_final,
                "score_pageindex": score_pageindex,
                "score_vetorial": score_vetorial,
            })

        combinados.sort(key=lambda item: item["score_final"], reverse=True)
        return {
            "resultados": combinados,
            "explicabilidade": {
                "criterios": (
                    "Score final = peso_pageindex * score_pageindex_normalizado + peso_vetorial * score_vetorial_normalizado"
                ),
                "pesos": {
                    "peso_pageindex": self.peso_pageindex,
                    "peso_vetorial": self.peso_vetorial,
                },
            },
        }


busca_hibrida = SistemaBuscaHibrida(peso_pageindex=0.7, peso_vetorial=0.3)
rankeamento = busca_hibrida.rankear(
    resultados_pageindex=[{"id": "doc_1", "score": 0.9}, {"id": "doc_2", "score": 0.7}],
    resultados_vetoriais=[{"id": "doc_2", "score": 0.95}, {"id": "doc_3", "score": 0.6}],
)
rankeamento


## Chat Jur√≠dico Robusto

Este fluxo simula um chat jur√≠dico que:
- Mant√©m hist√≥rico de contexto
- Registra auditoria de cada intera√ß√£o
- Retorna respostas com fontes rastre√°veis


In [None]:
class ChatJuridico:
    def __init__(self, sistema: SistemaJuridicoUnificado):
        self.sistema = sistema
        self.historico: List[Dict[str, Any]] = []

    def enviar(self, consulta: str, fontes: Optional[List[str]] = None) -> Dict[str, Any]:
        fontes_list = fontes or []
        metadados_fontes = [
            self.sistema.downloader.registrar_fonte(fonte)
            for fonte in fontes_list
        ]
        contexto = {"fontes": fontes_list, "metadados_fontes": metadados_fontes}
        resposta = self.sistema.responder_consulta(consulta, contexto)
        self.historico.append({"consulta": consulta, "resposta": resposta})
        self.sistema.chatindex.registrar({"consulta": consulta, "resposta": resposta})
        return resposta


sistema = SistemaJuridicoUnificado(config)
chat = ChatJuridico(sistema)

chat.enviar("Quais s√£o os requisitos para tutela de urg√™ncia?", ["STJ", "Planalto"])

In [None]:
# Conversa√ß√£o de exemplo com o chat jur√≠dico
sistema = SistemaJuridicoUnificado(config)
chat = ChatJuridico(sistema)

chat.enviar("Quais s√£o os requisitos para tutela de urg√™ncia?", ["STJ", "Planalto"])
chat.enviar("Qual a base legal da responsabilidade civil objetiva?", ["STF", "Planalto"])


## Avalia√ß√£o e testes de regress√£o

Exemplos simples de m√©tricas para validar a qualidade do ranking e detectar regress√µes.


In [None]:
def recall_at_k(resultados: List[str], relevantes: List[str], k: int = 5) -> float:
    resultados_k = set(resultados[:k])
    relevantes_set = set(relevantes)
    if not relevantes_set:
        return 0.0
    return len(resultados_k & relevantes_set) / len(relevantes_set)


def avaliacao_regressao(resultados: List[str], relevantes: List[str], limite: float = 0.6) -> Dict[str, Any]:
    recall = recall_at_k(resultados, relevantes, k=5)
    aprovado = recall >= limite
    return {"recall@5": recall, "limite": limite, "aprovado": aprovado}


avaliacao_regressao(["doc_1", "doc_2", "doc_3"], ["doc_2", "doc_4"])


## Pr√≥ximos passos

1. Substituir os placeholders por integra√ß√µes reais (Docling, PageIndex API).
2. Implementar scraping com Playwright e rate limiting.
3. Enriquecer o ChatIndex com hist√≥rico persistente.
4. Adicionar testes de regress√£o e avalia√ß√£o de qualidade.
