# Notebook 2 — Preparação do Dataset para RAG

Este notebook aplica as correções e padronizações necessárias para deixar o dataset pronto para uso em um pipeline de RAG (Retrieval-Augmented Generation).

Nesta etapa:
- Corrigimos registros com inventário incorreto (ex.: PDF apontando para path inválido).
- Padronizamos campos textuais e metadados.
- Tratamos duplicados (mantendo os intencionais, com versionamento).
- Geramos o arquivo final `rag_dataset.csv`.
- Registramos todas as correções em logs para rastreabilidade.

In [1]:
from pathlib import Path
import pandas as pd
import os
import re

# notebook está em .../atividade6/notebooks
BASE_PATH = Path(os.getcwd()).parent              # .../atividade6
DATASET_PATH = BASE_PATH / "dataset_restaurante"  # .../atividade6/dataset_restaurante

# Aqui é a "raiz" dos arquivos (PDFs e imagens)
DATA_ROOT = DATASET_PATH

CSV_PATH = DATASET_PATH / "inventario_dataset.csv"

print("DATASET_PATH existe?", DATASET_PATH.exists())
print("CSV existe?", CSV_PATH.exists())
print("Pastas:", [p.name for p in DATASET_PATH.iterdir()])


DATASET_PATH existe? True
CSV existe? True
Pastas: ['fichas_tecnicas', 'imagens', 'inventario_curado.csv', 'inventario_dataset.csv', 'rag_dataset_chunks.csv']


In [2]:
df = pd.read_csv(CSV_PATH)

print("Linhas no inventário:", len(df))
df.head()

Linhas no inventário: 54


Unnamed: 0,document_id,tipo,path_arquivo,titulo,origem,data,categoria,versao,nivel_confianca
0,PDF_001,pdf,fichas_tecnicas/ficha_01_baiao_de_dois.pdf,Baiao-de-Dois,Ficha técnica oficial,2025-02-05,Tradicionais,v1.0,alto
1,PDF_002,pdf,fichas_tecnicas/ficha_02_favada.pdf,Favada,Ficha técnica oficial,2025-12-04,Tradicionais,v1.0,alto
2,PDF_003,pdf,fichas_tecnicas/ficha_03_feijao_de_corda.pdf,Feijao-de-Corda,Ficha técnica oficial,2025-05-31,Tradicionais,v1.0,alto
3,PDF_004,pdf,fichas_tecnicas/ficha_04_sarapatel.pdf,Sarapatel,Ficha técnica oficial,15/03/2025,Tradicionais,v1.0,alto
4,PDF_005,pdf,fichas_tecnicas/ficha_05_caldo_de_mocoto.pdf,Caldo de Mocoto,Ficha técnica oficial,11-05-2025,Tradicionais,v1.0,alto


In [3]:
def existe(path_rel: str) -> bool:
    return (DATA_ROOT / str(path_rel)).exists()

def normalizar_texto(x):
    if pd.isna(x):
        return ""
    x = str(x).strip()
    x = re.sub(r"\s+", " ", x)
    return x

## Etapa 1 — Correção dirigida do PDF_021

O documento `PDF_021` estava com tipo "pdf", porém apontando para um caminho incorreto (imagem inexistente).
Aqui corrigimos o `path_arquivo` para a ficha técnica correta do **Creme brûlée de doce de leite** e registramos a alteração em log.

In [4]:
PATH_CORRETO_PDF21 = "fichas_tecnicas/ficha_21_creme_brulee_de_doce_de_leite.pdf"

# antes
path_antigo = df.loc[df["document_id"] == "PDF_021", "path_arquivo"].iloc[0]

# aplica correção
df.loc[df["document_id"] == "PDF_021", "path_arquivo"] = PATH_CORRETO_PDF21
df.loc[df["document_id"] == "PDF_021", "tipo"] = "pdf"

log_fix_pdf21 = pd.DataFrame([{
    "document_id": "PDF_021",
    "campo": "path_arquivo",
    "valor_antigo": path_antigo,
    "valor_novo": PATH_CORRETO_PDF21,
    "arquivo_existe_depois": existe(PATH_CORRETO_PDF21),
    "observacao": "Correção do inventário: PDF apontava para path incorreto"
}])

log_fix_pdf21

Unnamed: 0,document_id,campo,valor_antigo,valor_novo,arquivo_existe_depois,observacao
0,PDF_021,path_arquivo,imagens/arquivo_nao_existe.jpg,fichas_tecnicas/ficha_21_creme_brulee_de_doce_...,True,Correção do inventário: PDF apontava para path...


ETAPA 2 — Padronização definitiva das nomenclaturas das imagens IMG_012 e IMG_022


In [5]:
df.loc[df["document_id"].isin(["IMG_012", "IMG_022"]),
       ["document_id", "tipo", "path_arquivo"]].assign(
    arquivo_existe=lambda x: x["path_arquivo"].apply(existe)
)

Unnamed: 0,document_id,tipo,path_arquivo,arquivo_existe
37,IMG_012,imagem,imagens/IMG_12.jpg,False
47,IMG_022,imagem,imagens/img_22pratofinal.jpg,False


In [6]:
from pathlib import Path

IMAGENS_DIR = DATASET_PATH / "imagens"

def renomear_imagem_para_padrao(df, document_id, novo_nome_arquivo):
    # registro atual
    row = df.loc[df["document_id"] == document_id].iloc[0]
    path_antigo_rel = row["path_arquivo"]
    path_antigo = DATASET_PATH / path_antigo_rel

    if not path_antigo.exists():
        raise FileNotFoundError(f"Arquivo não encontrado: {path_antigo}")

    novo_nome = novo_nome_arquivo.lower()
    path_novo = IMAGENS_DIR / novo_nome
    path_novo_rel = f"imagens/{novo_nome}"

    if path_novo.exists():
        raise FileExistsError(f"Já existe um arquivo com esse nome: {path_novo.name}")

    # renomeia no disco
    path_antigo.rename(path_novo)

    # atualiza o CSV (df)
    df.loc[df["document_id"] == document_id, "path_arquivo"] = path_novo_rel

    return {
        "document_id": document_id,
        "campo": "path_arquivo",
        "valor_antigo": path_antigo_rel,
        "valor_novo": path_novo_rel,
        "arquivo_existe_depois": path_novo.exists(),
        "observacao": "Padronização do nome do arquivo de imagem (IMG_###_nome_receita.jpg)"
    }

In [None]:
logs_imgs = []

logs_imgs.append(
    renomear_imagem_para_padrao(df, "IMG_012", "img_012_rabada.jpg")
)

logs_imgs.append(
    renomear_imagem_para_padrao(df, "IMG_022", "img_022_cocada_cremosa.jpg")
)

import pandas as pd
pd.DataFrame(logs_imgs)

In [None]:
df.loc[df["document_id"].isin(["IMG_012", "IMG_022"]),
       ["document_id", "path_arquivo"]].assign(
    arquivo_existe=lambda x: x["path_arquivo"].apply(existe)
)

ETAPA 3 — Correção e remoção dos arquivos Duplicados (PDF_006 e PDF_016)

In [None]:
df.loc[df["is_duplicate"] == True,
       ["document_id", "path_arquivo", "duplicate_group", "versao_rag"]]

In [None]:
# máscara para duplicados que NÃO queremos manter
mask_remover = (df["is_duplicate"] == True) & (df["versao_rag"] != "v1")

df_removidos = df[mask_remover].copy()
df_mantidos = df[~mask_remover].copy()

print("Registros removidos:", len(df_removidos))
print("Registros mantidos:", len(df_mantidos))

In [None]:
log_remocoes = df_removidos[[
    "document_id",
    "path_arquivo",
    "duplicate_group",
    "versao_rag",
    "duplicate_policy"
]].copy()

log_remocoes["acao"] = "removido_duplicado"
log_remocoes["observacao"] = "Duplicado intencional removido para normalização do dataset final"

log_remocoes

In [None]:
df = df_mantidos.copy()

# opcional: limpar colunas que não fazem mais sentido
df["versao_rag"] = "v1"
df["is_duplicate"] = False
df["duplicate_policy"] = ""

In [None]:
df.loc[df["document_id"].isin(["PDF_006", "PDF_016"]),
       ["document_id", "path_arquivo", "versao_rag"]]

In [None]:
OUT_INVENTARIO_CURADO = DATASET_PATH / "inventario_curado.csv"
df.to_csv(OUT_INVENTARIO_CURADO, index=False)

print("✅ Inventário final curado salvo em:", OUT_INVENTARIO_CURADO)

✅ Inventário final curado salvo em: c:\Users\BlueShift\Desktop\atividade6\dataset_restaurante\inventario_curado.csv


# Parte 2 — Preparação do Dataset para RAG (Retrieval-Augmented Generation)

Nesta etapa, o objetivo foi transformar o inventário curado de documentos em uma base
estruturada e adequada para uso em um sistema de Retrieval-Augmented Generation (RAG).

O foco desta fase não é ainda a interface do chatbot, mas sim garantir que o conteúdo
esteja corretamente extraído, normalizado e segmentado, permitindo recuperação eficiente
e respostas precisas em etapas posteriores.

## 2.1 Extração de Conteúdo Textual

Foram aplicadas técnicas distintas de extração conforme o tipo de documento:

- **PDFs (fichas técnicas)**:  
  O texto foi extraído diretamente dos arquivos PDF, preservando informações essenciais
  como ingredientes, modo de preparo, tempo, categoria e observações técnicas.

- **Imagens (JPEG)**:  
  Foi configurado OCR via Tesseract. No entanto, como as imagens representam apenas fotos
  ilustrativas dos pratos (sem texto embutido), a extração resultou em conteúdo vazio,
  comportamento esperado e documentado.

## 2.2 Limpeza e Normalização do Texto

Após a extração, os textos passaram por um processo de limpeza e normalização, incluindo:
- Remoção de caracteres inválidos e ruídos comuns de OCR
- Padronização de espaços e quebras de linha
- Garantia de consistência textual para posterior geração de embeddings

Esse passo é fundamental para melhorar a qualidade da recuperação semântica.

## 2.3 Segmentação em Chunks

Os textos normalizados foram segmentados em **chunks semânticos**, respeitando:
- Tamanho máximo por chunk
- Sobreposição (overlap) para preservação de contexto

Características observadas:
- Cada ficha técnica (PDF) gerou, em média, **1 chunk**
- Documentos ligeiramente maiores geraram **2 chunks**
- Imagens não geraram chunks, por não conterem texto

Ao final, foram gerados **28 chunks**, número coerente com o tamanho e a natureza do dataset.

## 2.4 Dataset Final para RAG

O resultado desta etapa é o arquivo:

**`rag_dataset_chunks.csv`**

Este dataset contém:
- Identificação do documento (`document_id`)
- Identificação do chunk (`chunk_id`)
- Conteúdo textual do chunk
- Metadados relevantes (categoria, origem, caminho do arquivo, etc.)

Esse arquivo representa a **base final de conhecimento** que será utilizada nas próximas
etapas de indexação vetorial, recuperação de contexto e geração de respostas com modelos
de linguagem (LLMs).

Com o dataset preparado, o projeto segue para a fase de **integração com LLM**, incluindo
indexação por embeddings, recuperação semântica e construção do chatbot.

In [1]:
import pandas as pd
from pathlib import Path

# caminhos
BASE_PATH = Path(r"c:\Users\BlueShift\Desktop\atividade6")
DATASET_PATH = BASE_PATH / "dataset_restaurante"

# sempre usar o inventário já curado
CSV_CURADO = DATASET_PATH / "inventario_curado.csv"
assert CSV_CURADO.exists(), f"Não achei {CSV_CURADO}"

df = pd.read_csv(CSV_CURADO)

print("✅ df carregado do inventário curado")
print("Linhas:", len(df))
print("Colunas:", list(df.columns))
df.head(3)


✅ df carregado do inventário curado
Linhas: 52
Colunas: ['document_id', 'tipo', 'path_arquivo', 'titulo', 'origem', 'data', 'categoria', 'versao', 'nivel_confianca', 'is_duplicate', 'duplicate_group', 'versao_rag', 'duplicate_policy']


Unnamed: 0,document_id,tipo,path_arquivo,titulo,origem,data,categoria,versao,nivel_confianca,is_duplicate,duplicate_group,versao_rag,duplicate_policy
0,PDF_001,pdf,fichas_tecnicas/ficha_01_baiao_de_dois.pdf,Baiao-de-Dois,Ficha técnica oficial,2025-02-05,Tradicionais,v1.0,alto,False,,v1,
1,PDF_002,pdf,fichas_tecnicas/ficha_02_favada.pdf,Favada,Ficha técnica oficial,2025-12-04,Tradicionais,v1.0,alto,False,,v1,
2,PDF_003,pdf,fichas_tecnicas/ficha_03_feijao_de_corda.pdf,Feijao-de-Corda,Ficha técnica oficial,2025-05-31,Tradicionais,v1.0,alto,False,,v1,


In [2]:
import pdfplumber

def extrair_texto_pdf(path_rel: str) -> str:
    path = DATASET_PATH / path_rel
    textos = []
    with pdfplumber.open(path) as pdf:
        for page in pdf.pages:
            textos.append(page.extract_text() or "")
    return "\n".join(textos).strip()

mask_pdf = df["tipo"].str.lower().eq("pdf")
df.loc[mask_pdf, "texto"] = df.loc[mask_pdf, "path_arquivo"].apply(extrair_texto_pdf)

df.loc[mask_pdf, ["document_id", "path_arquivo", "texto"]].head(2)

Unnamed: 0,document_id,path_arquivo,texto
0,PDF_001,fichas_tecnicas/ficha_01_baiao_de_dois.pdf,FICHA TÉCNICA - RESTAURANTE\nBaião-de-Dois\nCA...
1,PDF_002,fichas_tecnicas/ficha_02_favada.pdf,FICHA TÉCNICA - RESTAURANTE\nFavada\nCATEGORIA...


In [3]:
from PIL import Image
import pytesseract

def extrair_texto_imagem(path_rel: str) -> str:
    path = DATASET_PATH / path_rel
    img = Image.open(path)
    return pytesseract.image_to_string(img, lang="por").strip()

mask_img = df["tipo"].str.lower().eq("imagem")
df.loc[mask_img, "texto"] = df.loc[mask_img, "path_arquivo"].apply(extrair_texto_imagem)

df.loc[mask_img, ["document_id", "path_arquivo", "texto"]].head(2)

Unnamed: 0,document_id,path_arquivo,texto
26,IMG_001,imagens/img_01_baiao_de_dois.jpg,
27,IMG_002,imagens/img_02_favada.jpg,


In [4]:
import re

def limpar_texto(txt: str) -> str:
    if txt is None:
        return ""
    txt = str(txt)
    txt = txt.replace("\x0c", " ")  # lixo comum do OCR
    txt = re.sub(r"[ \t]+", " ", txt)
    txt = re.sub(r"\n{3,}", "\n\n", txt)
    return txt.strip()

df["texto_limpo"] = df["texto"].apply(limpar_texto)

df[["document_id", "tipo", "texto_limpo"]].head(2)


Unnamed: 0,document_id,tipo,texto_limpo
0,PDF_001,pdf,FICHA TÉCNICA - RESTAURANTE\nBaião-de-Dois\nCA...
1,PDF_002,pdf,FICHA TÉCNICA - RESTAURANTE\nFavada\nCATEGORIA...


In [5]:
df["n_chars"] = df["texto_limpo"].str.len()
df[["document_id", "tipo", "n_chars"]].sort_values("n_chars").head(10)

Unnamed: 0,document_id,tipo,n_chars
30,IMG_005,imagem,0
31,IMG_006,imagem,0
27,IMG_002,imagem,0
26,IMG_001,imagem,0
29,IMG_004,imagem,0
28,IMG_003,imagem,0
41,IMG_016,imagem,0
40,IMG_015,imagem,0
39,IMG_014,imagem,0
38,IMG_013,imagem,0


In [6]:
def chunk_text(texto: str, chunk_size: int = 800, overlap: int = 150):
    if not texto or len(texto.strip()) == 0:
        return []
    chunks = []
    start = 0
    n = len(texto)
    while start < n:
        end = min(start + chunk_size, n)
        chunk = texto[start:end].strip()
        if chunk:
            chunks.append(chunk)
        start = end - overlap
        if start < 0:
            start = 0
        if end == n:
            break
    return chunks

df["chunks"] = df["texto_limpo"].apply(chunk_text)
df[["document_id", "tipo"]].assign(qtd_chunks=df["chunks"].apply(len)).sort_values("qtd_chunks").head(15)

Unnamed: 0,document_id,tipo,qtd_chunks
30,IMG_005,imagem,0
31,IMG_006,imagem,0
27,IMG_002,imagem,0
26,IMG_001,imagem,0
29,IMG_004,imagem,0
28,IMG_003,imagem,0
41,IMG_016,imagem,0
40,IMG_015,imagem,0
39,IMG_014,imagem,0
38,IMG_013,imagem,0


In [7]:
df_chunks = df.explode("chunks").reset_index(drop=True)
df_chunks = df_chunks[df_chunks["chunks"].notna() & (df_chunks["chunks"].str.len() > 0)].copy()

df_chunks["chunk_id"] = df_chunks.groupby("document_id").cumcount().add(1)

# metadados úteis (mantém só os que existirem no seu df)
meta_cols = [c for c in ["document_id", "chunk_id", "chunks", "categoria", "origem", "titulo", "versao", "nivel_confianca", "tipo", "path_arquivo"] if c in df_chunks.columns]
rag_dataset = df_chunks[meta_cols].copy()

rag_dataset.head()

Unnamed: 0,document_id,chunk_id,chunks,categoria,origem,titulo,versao,nivel_confianca,tipo,path_arquivo
0,PDF_001,1,FICHA TÉCNICA - RESTAURANTE\nBaião-de-Dois\nCA...,Tradicionais,Ficha técnica oficial,Baiao-de-Dois,v1.0,alto,pdf,fichas_tecnicas/ficha_01_baiao_de_dois.pdf
1,PDF_001,2,"ntra por último, sendo misturado com\ncuidado ...",Tradicionais,Ficha técnica oficial,Baiao-de-Dois,v1.0,alto,pdf,fichas_tecnicas/ficha_01_baiao_de_dois.pdf
2,PDF_002,1,FICHA TÉCNICA - RESTAURANTE\nFavada\nCATEGORIA...,Tradicionais,Ficha técnica oficial,Favada,v1.0,alto,pdf,fichas_tecnicas/ficha_02_favada.pdf
3,PDF_002,2,PREPARO\n1h30\nRESTRIÇÕES ALIMENTARES\nSem glú...,Tradicionais,Ficha técnica oficial,Favada,v1.0,alto,pdf,fichas_tecnicas/ficha_02_favada.pdf
4,PDF_003,1,FICHA TÉCNICA - RESTAURANTE\nFeijão-de-Corda\n...,Tradicionais,Ficha técnica oficial,Feijao-de-Corda,v1.0,alto,pdf,fichas_tecnicas/ficha_03_feijao_de_corda.pdf


In [8]:
OUT_RAG_CHUNKS = DATASET_PATH / "rag_dataset_chunks.csv"
rag_dataset.to_csv(OUT_RAG_CHUNKS, index=False)

print("✅ Dataset RAG (chunks) salvo em:", OUT_RAG_CHUNKS)
print("Total de chunks:", len(rag_dataset))


✅ Dataset RAG (chunks) salvo em: c:\Users\BlueShift\Desktop\atividade6\dataset_restaurante\rag_dataset_chunks.csv
Total de chunks: 28


In [9]:
print("Resumo RAG")
print("Docs no inventário:", len(df))
print("PDFs:", (df["tipo"].str.lower() == "pdf").sum())
print("Imagens:", (df["tipo"].str.lower() == "imagem").sum())
print("Chunks gerados:", len(rag_dataset))
print("Média chunks por doc:", round(len(rag_dataset) / len(df), 2))


Resumo RAG
Docs no inventário: 52
PDFs: 26
Imagens: 26
Chunks gerados: 28
Média chunks por doc: 0.54


# Parte 3 — Integração com LLM (Azure) e Chatbot RAG

Nesta parte, transformamos o dataset segmentado em um chatbot funcional com RAG (Retrieval-Augmented Generation).

Até aqui já concluímos:
- Curadoria do inventário e correção de inconsistências (paths, nomes e duplicados)
- Extração de texto dos PDFs e organização do conteúdo
- Segmentação do texto em chunks e geração do arquivo `rag_dataset_chunks.csv` (base final para recuperação)

A partir deste ponto, entramos na fase de **LLM + Recuperação**, que envolve:

## 3.1 Preparação de credenciais (Azure Key Vault)
Objetivo:
- Conectar no Azure Key Vault para recuperar com segurança as credenciais necessárias (sem expor chaves no código)
- Carregar variáveis essenciais para o uso de embeddings e geração de resposta (LLM)

Entregáveis dessa etapa:
- Conexão validada com o Key Vault
- Secrets carregados no runtime (ex.: endpoint e key do Azure OpenAI ou chave de API)

## 3.2 Indexação Vetorial (VectorStore)

Objetivo:
- Criar um índice vetorial (VectorStore) a partir dos chunks gerados em `rag_dataset_chunks.csv`
- Permitir recuperação eficiente por similaridade

Implementação adotada:
- **Embeddings locais (SentenceTransformers)** para vetorização dos chunks
- **FAISS** como VectorStore (índice vetorial local)

Observação:
- A geração final de respostas (LLM) permanece no **Azure OpenAI** (via Key Vault),
  garantindo padronização e uso de credenciais corporativas.

## 3.3 Recuperação (Retrieve)
Objetivo:
- Dada uma pergunta do usuário, calcular o embedding da pergunta
- Buscar os Top-k chunks mais relevantes com base em similaridade (cosseno)

Resultado esperado:
- Lista de chunks relevantes + metadados (document_id, chunk_id, path_arquivo, score)

## 3.4 Geração de Resposta (Generate)
Objetivo:
- Montar um prompt com a pergunta + contexto recuperado (chunks)
- Solicitar a resposta ao modelo de linguagem (LLM), mantendo a resposta ancorada nos documentos recuperados

Resultado esperado:
- Resposta final + rastreabilidade

## 3.5 Chatbot (Loop de Perguntas) + Fontes
Objetivo:
- Criar um fluxo de chat simples (no notebook) ou interface futura (ex.: Streamlit)
- Exibir a resposta e também as fontes utilizadas (documentos e chunks)

Resultado esperado:
- Chatbot funcional com RAG e transparência de fontes

In [10]:
from pathlib import Path
import pandas as pd
import numpy as np
import faiss

BASE_PATH = Path(r"c:\Users\BlueShift\Desktop\atividade6")
DATASET_PATH = BASE_PATH / "dataset_restaurante"

print("BASE_PATH:", BASE_PATH)
print("DATASET_PATH existe?", DATASET_PATH.exists())
print("Conteúdo:", [p.name for p in DATASET_PATH.iterdir()])

BASE_PATH: c:\Users\BlueShift\Desktop\atividade6
DATASET_PATH existe? True
Conteúdo: ['fichas_tecnicas', 'imagens', 'inventario_curado.csv', 'inventario_dataset.csv', 'rag_dataset_chunks.csv']


In [11]:
from azure.identity import DefaultAzureCredential
from azure.keyvault.secrets import SecretClient
KEY_VAULT_NAME = "kv-academy-01"
KV_URI = f"https://kv-academy-01.vault.azure.net"

credential = DefaultAzureCredential()
kv_client = SecretClient(vault_url=KV_URI, credential=credential)

print("✅ Conectado ao Key Vault:", KV_URI)


✅ Conectado ao Key Vault: https://kv-academy-01.vault.azure.net


In [12]:
SECRET_ENDPOINT    = "URL-API-GPT"
SECRET_API_VERSION = "VERSION-API-GPT"
SECRET_API_KEY     = "KEY-API-GPT"     
SECRET_DEPLOY_CHAT = "MODELO-APT-GPT"

AZURE_OPENAI_ENDPOINT     = kv_client.get_secret(SECRET_ENDPOINT).value
AZURE_OPENAI_API_VERSION  = kv_client.get_secret(SECRET_API_VERSION).value
AZURE_OPENAI_API_KEY      = kv_client.get_secret(SECRET_API_KEY).value
AZURE_OPENAI_CHAT_DEPLOY  = kv_client.get_secret(SECRET_DEPLOY_CHAT).value

print("✅ Azure OpenAI carregado do Key Vault:")
print("Endpoint:", AZURE_OPENAI_ENDPOINT)
print("API Version:", AZURE_OPENAI_API_VERSION)
print("Chat deployment:", AZURE_OPENAI_CHAT_DEPLOY)


✅ Azure OpenAI carregado do Key Vault:
Endpoint: https://oai-academy-ia.openai.azure.com/openai/deployments/gpt-35-turbo/chat/completions?api-version=2025-01-01-preview
API Version: 2024-12-01-preview
Chat deployment: gpt-4.1-mini


In [13]:
from openai import AzureOpenAI

client = AzureOpenAI(
    api_key=AZURE_OPENAI_API_KEY,
    api_version=AZURE_OPENAI_API_VERSION,
    azure_endpoint=AZURE_OPENAI_ENDPOINT,
)

resp = client.chat.completions.create(
    model=AZURE_OPENAI_CHAT_DEPLOY,
    messages=[{"role": "user", "content": "Responda apenas com OK."}],
    temperature=0
)

print("✅ Teste chat:", resp.choices[0].message.content)

✅ Teste chat: OK


In [14]:
RAG_CHUNKS_PATH = DATASET_PATH / "rag_dataset_chunks.csv"
assert RAG_CHUNKS_PATH.exists(), f"Não encontrei: {RAG_CHUNKS_PATH}"

rag_dataset = pd.read_csv(RAG_CHUNKS_PATH)

print("✅ Chunks carregados:", len(rag_dataset))
print("Colunas:", list(rag_dataset.columns))
rag_dataset.head(2)


✅ Chunks carregados: 28
Colunas: ['document_id', 'chunk_id', 'chunks', 'categoria', 'origem', 'titulo', 'versao', 'nivel_confianca', 'tipo', 'path_arquivo']


Unnamed: 0,document_id,chunk_id,chunks,categoria,origem,titulo,versao,nivel_confianca,tipo,path_arquivo
0,PDF_001,1,FICHA TÉCNICA - RESTAURANTE\nBaião-de-Dois\nCA...,Tradicionais,Ficha técnica oficial,Baiao-de-Dois,v1.0,alto,pdf,fichas_tecnicas/ficha_01_baiao_de_dois.pdf
1,PDF_001,2,"ntra por último, sendo misturado com\ncuidado ...",Tradicionais,Ficha técnica oficial,Baiao-de-Dois,v1.0,alto,pdf,fichas_tecnicas/ficha_01_baiao_de_dois.pdf


In [15]:
import re
import unicodedata

CATEGORIAS_OFICIAIS = ["Tradicional", "Especialidade", "Salada", "Sobremesa"]

def _strip_accents(s: str) -> str:
    s = unicodedata.normalize("NFKD", s)
    return "".join(c for c in s if not unicodedata.combining(c))

def norm_cat(s: str) -> str:
    if not isinstance(s, str):
        return ""
    t = _strip_accents(s.strip().lower())

    if "tradicion" in t: return "Tradicional"
    if "especial" in t: return "Especialidade"
    if "salad" in t: return "Salada"
    if "sobremes" in t: return "Sobremesa"
    return ""

def extract_cat_from_chunk(text: str) -> str:
    if not isinstance(text, str):
        return ""
    # Tenta pegar "CATEGORIA: X" ou "CATEGORIA\nX"
    m = re.search(r'(?i)\bCATEGORIA\b\s*[:\n]\s*([^\n\r]+)', text)
    return m.group(1).strip() if m else ""

# 1) Normaliza a categoria que veio no CSV
rag_dataset["categoria_norm"] = rag_dataset["categoria"].apply(norm_cat)

# 2) Extrai a categoria escrita dentro do chunk (quando existir)
rag_dataset["categoria_in_chunk"] = rag_dataset["chunks"].apply(extract_cat_from_chunk)
rag_dataset["categoria_chunk_norm"] = rag_dataset["categoria_in_chunk"].apply(norm_cat)

# 3) Categoria corrigida final: usa a do chunk se existir, senão a do CSV
rag_dataset["categoria_corr"] = rag_dataset.apply(
    lambda r: r["categoria_chunk_norm"] if r["categoria_chunk_norm"] else r["categoria_norm"],
    axis=1
)

print("Categorias corrigidas (categoria_corr):")
print(rag_dataset["categoria_corr"].value_counts())


Categorias corrigidas (categoria_corr):
categoria_corr
Sobremesa        10
Especialidade     8
Tradicional       7
Salada            3
Name: count, dtype: int64


In [16]:
def listar_categorias():
    # Sempre retorna as 4 categorias oficiais, na mesma ordem
    return CATEGORIAS_OFICIAIS

def extrair_categoria_da_pergunta(pergunta: str) -> str | None:
    p = pergunta.lower()
    if "tradicion" in p: return "Tradicional"
    if "especial" in p: return "Especialidade"
    if "salad" in p: return "Salada"
    if "sobremes" in p: return "Sobremesa"
    return None

def listar_pratos_da_categoria(cat: str):
    # Para listar pratos do cardápio, consideramos só os PDFs (receitas/pratos)
    df_menu = rag_dataset[rag_dataset["tipo"].astype(str).str.lower() == "pdf"].copy()

    pratos = (
        df_menu.loc[df_menu["categoria_corr"] == cat, "titulo"]
        .dropna()
        .astype(str)
        .str.strip()
        .unique()
        .tolist()
    )
    pratos = sorted(set([p for p in pratos if p and p.lower() != "nan"]))
    return pratos


In [17]:
from sentence_transformers import SentenceTransformer

model_st = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2")

texts = rag_dataset["chunks"].fillna("").astype(str).tolist()

E = model_st.encode(
    texts,
    show_progress_bar=True,
    convert_to_numpy=True,
    normalize_embeddings=True
).astype("float32")

print("✅ Embeddings locais gerados:", E.shape)

Batches:   0%|          | 0/1 [00:00<?, ?it/s]

✅ Embeddings locais gerados: (28, 384)


In [19]:
dim = E.shape[1]
index = faiss.IndexFlatIP(dim)   # IP + normalizado => cosseno
index.add(E)

print("✅ VectorStore FAISS criado. Vetores:", index.ntotal)

✅ VectorStore FAISS criado. Vetores: 28


In [20]:
def retrieve_faiss(query: str, top_k: int = 10):
    q = model_st.encode([query], convert_to_numpy=True, normalize_embeddings=True).astype("float32")
    scores, idx = index.search(q, top_k)

    hits = rag_dataset.iloc[idx[0]].copy()
    hits["score"] = scores[0]
    return hits.sort_values("score", ascending=False)

In [47]:
def format_context(rows, max_chars=4500):
    parts, total = [], 0
    for r in rows.itertuples():
        tag = f"[Fonte: {r.document_id} | chunk {r.chunk_id} | {r.path_arquivo}]"
        block = f"{tag}\n{str(r.chunks).strip()}\n"
        if total + len(block) > max_chars:
            break
        parts.append(block)
        total += len(block)
    return "\n".join(parts)


import pandas as pd
import re
import unicodedata


CATEGORIAS_OFICIAIS = ["Tradicional", "Especialidade", "Salada", "Sobremesa"]


def hits_empty(query: str = "", fonte: str = "rag_dataset_chunks.csv (consulta estruturada)"):
    return pd.DataFrame([{
        "document_id": fonte,
        "chunk_id": "-",
        "score": 1.0,
        "categoria": "-",
        "categoria_corr": "-",
        "titulo": "-",
        "tipo": "dataset",
        "path_arquivo": fonte
    }])


def listar_categorias():
    return CATEGORIAS_OFICIAIS


def extrair_categoria_da_pergunta(pergunta: str):
    p = pergunta.lower()
    if "tradicion" in p:
        return "Tradicional"
    if "especial" in p:
        return "Especialidade"
    if "salad" in p:
        return "Salada"
    if "sobremes" in p:
        return "Sobremesa"
    return None


def listar_pratos_da_categoria(cat: str):
    df_menu = rag_dataset[rag_dataset["tipo"].astype(str).str.lower() == "pdf"].copy()
    pratos = (
        df_menu.loc[df_menu["categoria_corr"] == cat, "titulo"]
        .dropna()
        .astype(str)
        .str.strip()
        .unique()
        .tolist()
    )
    return sorted(set([p for p in pratos if p and p.lower() != "nan"]))


# -----------------------------
# NOVO: detectar "categoria de um prato"
# -----------------------------
def _norm_text(s: str) -> str:
    s = "" if s is None else str(s)
    s = unicodedata.normalize("NFKD", s)
    s = "".join(c for c in s if not unicodedata.combining(c))
    s = s.lower()
    s = re.sub(r"[^a-z0-9\s]+", " ", s)
    s = re.sub(r"\s+", " ", s).strip()
    return s


def _build_menu_maps():
    # usa apenas PDFs (pratos)
    df_menu = rag_dataset[rag_dataset["tipo"].astype(str).str.lower() == "pdf"][["titulo", "categoria_corr"]].dropna()
    df_menu = df_menu.drop_duplicates()

    titulo_norm_to_orig = {_norm_text(t): t for t in df_menu["titulo"].tolist()}
    titulo_norm_to_cat = {_norm_text(t): c for t, c in zip(df_menu["titulo"], df_menu["categoria_corr"])}
    return titulo_norm_to_orig, titulo_norm_to_cat


# constrói mapas uma vez (reconstrói ao reexecutar a célula)
TITULO_NORM_TO_ORIG, TITULO_NORM_TO_CAT = _build_menu_maps()


def eh_pergunta_categoria_de_prato(pergunta: str) -> bool:
    p = pergunta.lower()
    gatilhos = [
        "qual a categoria",
        "qual é a categoria",
        "qual categoria",
        "em qual categoria",
        "categoria do prato",
        "categoria da receita",
        "esse prato é de qual categoria",
        "essa receita é de qual categoria",
    ]
    return ("categoria" in p) and any(g in p for g in gatilhos)


def encontrar_prato_na_pergunta(pergunta: str):
    qn = _norm_text(pergunta)

    # 1) tenta achar título completo como substring
    for tnorm in sorted(TITULO_NORM_TO_ORIG.keys(), key=len, reverse=True):
        if tnorm and tnorm in qn:
            return tnorm

    # 2) fallback por overlap de palavras (mínimo 2)
    q_tokens = set(qn.split())
    best, best_score = None, 0
    for tnorm in TITULO_NORM_TO_ORIG.keys():
        t_tokens = set(tnorm.split())
        score = len(q_tokens & t_tokens)
        if score > best_score and score >= 2:
            best, best_score = tnorm, score
    return best


# -----------------------------
# Rotas / itents
# -----------------------------
def eh_pergunta_listar_itens_categoria(pergunta: str) -> bool:
    p = pergunta.lower()
    tem_intencao = any(
        k in p
        for k in [
            "liste",
            "listar",
            "quais pratos",
            "quais itens",
            "itens da categoria",
            "pratos da categoria",
            "menu da categoria",
        ]
    )
    return tem_intencao and (extrair_categoria_da_pergunta(pergunta) is not None)


def eh_pergunta_de_categorias(pergunta: str) -> bool:
    p = pergunta.lower().strip()

    # não confundir com "categoria de um prato"
    if eh_pergunta_categoria_de_prato(pergunta):
        return False

    gatilhos = [
        "quantas categorias",
        "liste as categorias",
        "listar categorias",
        "quais sao as categorias",
        "quais são as categorias",
        "categorias do cardapio",
        "categorias do cardápio",
        "todas as categorias",
    ]
    return any(g in p for g in gatilhos)


def generate_answer(query: str, top_k: int = 5, temperature: float = 0.2, min_score: float = 0.28):
    q = query.lower()

    # 1) LISTAR PRATOS POR CATEGORIA (vem primeiro!)
    if eh_pergunta_listar_itens_categoria(query):
        cat = extrair_categoria_da_pergunta(query)
        pratos = listar_pratos_da_categoria(cat)

        if not pratos:
            return f"Não encontrei pratos para a categoria **{cat}** na base atual.", hits_empty()

        texto = f"Pratos da categoria **{cat}**:\n- " + "\n- ".join(pratos)
        texto += f"\n\nTotal: {len(pratos)} pratos."
        return texto, hits_empty(query, f"rag_dataset_chunks.csv (lista de pratos: {cat})")

    # 2) CATEGORIA DE UM PRATO ESPECÍFICO
    if eh_pergunta_categoria_de_prato(query):
        tnorm = encontrar_prato_na_pergunta(query)
        if tnorm and tnorm in TITULO_NORM_TO_CAT:
            prato = TITULO_NORM_TO_ORIG[tnorm]
            cat = TITULO_NORM_TO_CAT[tnorm]
            return f"O prato **{prato}** fica na categoria **{cat}**.", hits_empty(query, f"rag_dataset_chunks.csv (match título: {prato})")
        return (
            "Não consegui identificar o nome do prato na sua pergunta. "
            "Digite o nome exato do prato (como no cardápio) que eu te falo a categoria.",
            hits_empty(),
        )

    # 3) LISTAR CATEGORIAS DO CARDÁPIO
    if eh_pergunta_de_categorias(query):
        cats = listar_categorias()
        if "quantas" in q:
            texto = f"No cardápio existem {len(cats)} categorias:\n- " + "\n- ".join(cats)
        else:
            texto = "As categorias no cardápio são:\n- " + "\n- ".join(cats)
        texto += f"\n\nTotal: {len(cats)} categorias."
        return texto, hits_empty(query, "rag_dataset_chunks.csv (categorias oficiais)")

    # 4) RAG NORMAL
    hits = retrieve_faiss(query, top_k=max(top_k, 20))
    if min_score is not None:
        hits = hits[hits["score"] >= min_score]

    # OBS: se preço/detalhe vier ruim, troque por hits = hits.head(top_k).copy()
    hits = hits.drop_duplicates(subset=["document_id"]).head(top_k).copy()

    context = format_context(hits)

    system = (
        "Você é um assistente de um restaurante. "
        "Responda SOMENTE com base no CONTEXTO fornecido. "
        "NUNCA invente categorias, pratos ou preços. "
        "Se o contexto não tiver a informação, diga que não encontrou na base. "
        "No final, liste as fontes usadas no formato: document_id (chunk_id)."
    )

    user = f"PERGUNTA:\n{query}\n\nCONTEXTO:\n{context}"

    resp = client.chat.completions.create(
        model=AZURE_OPENAI_CHAT_DEPLOY,
        messages=[
            {"role": "system", "content": system},
            {"role": "user", "content": user},
        ],
        temperature=temperature,
    )

    return resp.choices[0].message.content, hits


In [41]:
import re
import unicodedata

def _norm_text(s: str) -> str:
    s = "" if s is None else str(s)
    s = unicodedata.normalize("NFKD", s)
    s = "".join(c for c in s if not unicodedata.combining(c))
    s = s.lower()
    s = re.sub(r"[^a-z0-9\s]+", " ", s)
    s = re.sub(r"\s+", " ", s).strip()
    return s

# Mapa de pratos -> categoria (apenas PDFs = pratos do cardápio)
_TITULOS_MENU = (
    rag_dataset[rag_dataset["tipo"].astype(str).str.lower() == "pdf"][["titulo","categoria_corr"]]
    .dropna()
    .drop_duplicates()
)

TITULO_NORM_TO_ORIG = { _norm_text(t): t for t in _TITULOS_MENU["titulo"].tolist() }
TITULO_NORM_TO_CAT  = { _norm_text(t): c for t,c in zip(_TITULOS_MENU["titulo"], _TITULOS_MENU["categoria_corr"]) }

def eh_pergunta_categoria_de_prato(pergunta: str) -> bool:
    p = pergunta.lower()
    # exemplos: "qual a categoria da rabada", "em qual categoria fica rabada"
    gatilhos = [
        "qual a categoria", "qual é a categoria", "qual categoria",
        "em qual categoria", "essa receita é de qual categoria",
        "esse prato é de qual categoria", "categoria do prato", "categoria da receita"
    ]
    return ("categoria" in p) and any(g in p for g in gatilhos)

def encontrar_prato_na_pergunta(pergunta: str) -> str | None:
    qn = _norm_text(pergunta)

    # tenta match por substring do título no texto da pergunta (mais confiável)
    # ordena por tamanho desc pra pegar títulos maiores primeiro
    for tnorm in sorted(TITULO_NORM_TO_ORIG.keys(), key=len, reverse=True):
        if tnorm and tnorm in qn:
            return tnorm

    # fallback: tenta match por palavras (interseção)
    q_tokens = set(qn.split())
    best, best_score = None, 0
    for tnorm in TITULO_NORM_TO_ORIG.keys():
        t_tokens = set(tnorm.split())
        score = len(q_tokens & t_tokens)
        if score > best_score and score >= 2:  # pelo menos 2 palavras em comum
            best, best_score = tnorm, score
    return best


In [57]:
ans, used = generate_answer("Quantas categorias possui", top_k=5)
print(ans)

if used is not None and len(used) > 0:
    used_cols = [c for c in ["document_id","chunk_id","score","categoria_norm","titulo","path_arquivo"] if c in used.columns]
    display(used[used_cols].head(1))
else:
    print("Sem fontes para exibir (resposta não usou RAG ou não houve hits).")


No cardápio existem 4 categorias:
- Tradicional
- Especialidade
- Salada
- Sobremesa

Total: 4 categorias.


Unnamed: 0,document_id,chunk_id,score,titulo,path_arquivo
0,rag_dataset_chunks.csv (categorias oficiais),-,1.0,-,rag_dataset_chunks.csv (categorias oficiais)
