# MODELOS

In [None]:
from pydantic import BaseModel
from typing import List, Optional

class Document(BaseModel):
    id: str
    content: str
    metadata: Optional[dict] = None

class Chunk(BaseModel):
    id: str
    document_id: str
    content: str
    embedding: Optional[List[float]] = None

class Query(BaseModel):
    question: str
    top_k: int = 3

class RetrievedDocument(BaseModel):
    chunk: Chunk
    relevance_score: float

class Answer(BaseModel):
    answer: str
    retrieved_documents: List[RetrievedDocument]
    processing_time: Optional[float] = None

# CARREGAR DOCUMENTO

In [None]:
from typing import List
from pathlib import Path
from core.models import Document
import docx2txt
import PyPDF2

class DocumentLoader:
    @staticmethod
    def load_docx2txt(file_path: str) -> str:
        doc = docx2txt.Document(file_path)
        return '\n'.join([p.text for p in doc.paragraphs if p.text.strip()])

    @staticmethod
    def load_pdf(file_path: str) -> str:
        text = ""
        with open(file_path, 'rb') as f:
            reader = PyPDF2.PdfReader(f)
            for page in reader.pages:
                text += page.extract_text() or ""
        return text

    @staticmethod
    def load_documents(paths: List[str]) -> List[Document]:
        docs = []
        for path in paths:
            ext = Path(path).suffix.lower()
            if ext == '.pdf':
                content = DocumentLoader.load_pdf(path)
            elif ext == '.docx2txt':
                content = DocumentLoader.load_docx2txt(path)
            else:
                continue
            docs.append(Document(id=str(path), content=content))
        return docs


# EMBEDDING

In [None]:
from sentence_transformers import SentenceTransformer
from typing import List

class EmbeddingModel:
    def __init__(self, model_name: str = 'sentence-transformers/all-MiniLM-L6-v2'):
        self.model = SentenceTransformer(model_name)

    def embed(self, texts: List[str]) -> List[List[float]]:
        embeddings = self.model.encode(texts, convert_to_numpy=False)
        # Garante que cada embedding está na CPU e como lista de floats
        return [emb.cpu().numpy().tolist() if hasattr(emb, 'cpu') else emb for emb in embeddings]


# LLM

In [None]:
from transformers import pipeline

class HuggingFaceLLM:
    def __init__(self, model_name: str = 'google/flan-t5-base'):
        self.generator = pipeline('text2text-generation', model=model_name)

    def generate(self, prompt: str) -> str:
        # Limitar o prompt para não exceder o limite do modelo
        max_input_length = 400  # Deixar margem de segurança
        if len(prompt) > max_input_length:
            prompt = prompt[:max_input_length] + "..."

        result = self.generator(prompt, max_new_tokens=1024, max_length=4096)
        return result[0]['generated_text'] if 'generated_text' in result[0] else result[0]['text']


# VETORIZANDO TEXTO

In [None]:
import faiss
import numpy as np
from typing import List, Tuple
from core.models import Chunk

class FAISSVectorStore:
    def __init__(self, embedding_dim: int):
        self.embedding_dim = embedding_dim
        self.index = faiss.IndexFlatL2(embedding_dim)
        self.chunks = []

    def add_chunks(self, chunks: List[Chunk]):
        embeddings = np.array([c.embedding for c in chunks]).astype('float32')
        self.index.add(embeddings)
        self.chunks.extend(chunks)

    def search(self, query_embedding: List[float], top_k: int = 5) -> List[Tuple[Chunk, float]]:
        query = np.array([query_embedding]).astype('float32')
        D, I = self.index.search(query, top_k)
        results = []
        for idx, score in zip(I[0], D[0]):
            if idx < len(self.chunks):
                results.append((self.chunks[idx], score))
        return results

# PIPELINES DO RAG

In [None]:
from core.document_loader import DocumentLoader
from core.embedding import EmbeddingModel
from core.vector_store import FAISSVectorStore
from core.llm import HuggingFaceLLM
from core.models import Query, Chunk, RetrievedDocument, Answer, Document
from typing import List
import time

class RAGPipeline:
    def __init__(self, embedding_model_name: str = None, llm_model_name: str = None, embedding_dim: int = 384):
        self.embedding = EmbeddingModel(model_name=embedding_model_name or 'sentence-transformers/all-MiniLM-L6-v2')
        self.vector_store = FAISSVectorStore(embedding_dim)
        self.llm = HuggingFaceLLM(model_name=llm_model_name or 'google/flan-t5-base')
        self.documents = []

    def add_documents(self, paths: List[str]):
        docs = DocumentLoader.load_documents(paths)
        chunks = []
        for doc in docs:
            # Dividir documento em chunks menores
            doc_chunks = self._split_document(doc)
            for i, chunk_content in enumerate(doc_chunks):
                chunk = Chunk(id=f"{doc.id}_chunk_{i}", document_id=doc.id, content=chunk_content)
                chunk.embedding = self.embedding.embed([chunk.content])[0]
                chunks.append(chunk)
        self.vector_store.add_chunks(chunks)
        self.documents.extend(docs)
        return docs

    def _split_document(self, doc: Document, chunk_size: int = 1000) -> List[str]:
        """Divide um documento em chunks menores baseado no número de caracteres"""
        content = doc.content
        chunks = []

        # Dividir por parágrafos primeiro (se possível)
        paragraphs = content.split('\n\n')
        current_chunk = ""

        for paragraph in paragraphs:
            # Se o parágrafo sozinho já é muito grande, dividi-lo
            if len(paragraph) > chunk_size:
                # Salvar chunk atual se não estiver vazio
                if current_chunk.strip():
                    chunks.append(current_chunk.strip())
                    current_chunk = ""

                # Dividir parágrafo grande em pedaços menores
                words = paragraph.split()
                temp_chunk = ""
                for word in words:
                    if len(temp_chunk + " " + word) > chunk_size and temp_chunk:
                        chunks.append(temp_chunk.strip())
                        temp_chunk = word
                    else:
                        temp_chunk += " " + word if temp_chunk else word

                if temp_chunk.strip():
                    chunks.append(temp_chunk.strip())

            # Se adicionar este parágrafo não exceder o limite
            elif len(current_chunk + "\n\n" + paragraph) <= chunk_size:
                current_chunk += "\n\n" + paragraph if current_chunk else paragraph
            else:
                # Salvar chunk atual e começar novo
                if current_chunk.strip():
                    chunks.append(current_chunk.strip())
                current_chunk = paragraph

        # Adicionar último chunk se não estiver vazio
        if current_chunk.strip():
            chunks.append(current_chunk.strip())

        return chunks if chunks else [content[:chunk_size]]

    def query(self, question: str, top_k: int = 5) -> Answer:
        start = time.time()
        query_embedding = self.embedding.embed([question])[0]
        results = self.vector_store.search(query_embedding, top_k=top_k)
        retrieved = [RetrievedDocument(chunk=chunk, relevance_score=score) for chunk, score in results]

        # Limitar o contexto para não exceder o limite do modelo
        context_parts = []
        total_length = 0
        max_context_length = 400  # Deixar espaço para pergunta e resposta

        for r in retrieved:
            chunk_text = r.chunk.content
            if total_length + len(chunk_text) <= max_context_length:
                context_parts.append(chunk_text)
                total_length += len(chunk_text)
            else:
                # Adicionar parte do chunk se couber
                remaining = max_context_length - total_length
                if remaining > 50:  # Só adicionar se for significativo
                    context_parts.append(chunk_text[:remaining] + "...")
                break

        context = '\n'.join(context_parts)
        prompt = f"Context:\n{context}\n\nQuestion: {question}\nAnswer:"
        answer_text = self.llm.generate(prompt)
        end = time.time()
        return Answer(answer=answer_text, retrieved_documents=retrieved, processing_time=end-start)

# RODANDO APLICAÇÃO

In [None]:
import sys
from core.rag_pipeline import RAGPipeline
from config.settings import settings
from utils.helpers import setup_logging, format_duration

if __name__ == "__main__":
    setup_logging()
    rag = RAGPipeline(
        embedding_model_name=settings.embedding_model_name,
        llm_model_name=settings.llm_model_name,
        embedding_dim=settings.embedding_dim
    )
    # Exemplo: adicionar documentos
    paths = ["doencas_respiratorias_cronicas.pdf"]
    rag.add_documents(paths)
    # Exemplo: consulta
    question = "Qual é a prevalência e o impacto das principais doenças respiratórias crônicas no Brasil, segundo dados do Ministério da Saúde?"
    answer = rag.query(question)
    print(f"Resposta: {answer.answer}")
    print(f"Tempo de processamento: {format_duration(answer.processing_time)}")
    print(f"Documentos recuperados: {len(answer.retrieved_documents)}")