# 🔍 RAG API - Sistema de Busca Semântica

## 📋 O que este notebook faz

Este notebook implementa um **sistema de RAG (Retrieval-Augmented Generation)** com busca semântica híbrida:

- 🔍 **Busca vetorial** usando embeddings BAAI/bge-m3
- 🏷️ **Filtros por metadata** (repo, branch, arquivo)
- 🎯 **Pesquisa híbrida** combinando vetores e metadata
- ⚡ **Cache inteligente** para queries frequentes

## 🌐 Endpoints Disponíveis

- `GET /api/v1/search` - Busca semântica híbrida
- `POST /api/v1/search` - Busca com payload complexo
- `GET /api/v1/search/similar/{point_id}` - Documentos similares
- `GET /api/v1/search/metadata` - Busca apenas por metadata
- `GET /api/v1/search/stats` - Estatísticas da collection
- `GET /processa/info` - Informações sobre a Processa Sistemas via RAG

---

## 🔧 Configuração e Imports

In [None]:
import os
import json
import hashlib
import time
from pathlib import Path
from datetime import datetime
from typing import List, Dict, Optional, Any
from collections import defaultdict

# Carregar configurações do ambiente (mesma abordagem do ETL)
from dotenv import load_dotenv

# Carregar .env
env_file = f".env.{os.getenv('ENVIRONMENT', 'development')}"
if Path(f"../{env_file}").exists():
    load_dotenv(f"../{env_file}")
elif Path("../.env").exists():
    load_dotenv("../.env")

# Qdrant
from qdrant_client import QdrantClient
from qdrant_client.models import (
    Filter, FieldCondition, MatchValue, Range
)

# Embeddings
from sentence_transformers import SentenceTransformer
import numpy as np

# Configuração do ambiente
QDRANT_URL = os.getenv("QDRANT_URL", "http://qdrant.codrstudio.dev:6333")
QDRANT_API_KEY = os.getenv("QDRANT_API_KEY")
COLLECTION_NAME = os.getenv("QDRANT_COLLECTION", "nic")
EMBEDDING_MODEL = os.getenv("EMBEDDING_MODEL", "BAAI/bge-m3")

# Parâmetros de busca
DEFAULT_TOP_K = 20
DEFAULT_SCORE_THRESHOLD = float(os.getenv("DEFAULT_SCORE_THRESHOLD", "0.6"))
MAX_TOP_K = 50

# Cache
CACHE_TTL = 300  # 5 minutos
query_cache = {}

print(f"🔧 Configuração RAG API:")
print(f"  Qdrant URL: {QDRANT_URL}")
print(f"  Collection: {COLLECTION_NAME}")
print(f"  Modelo: {EMBEDDING_MODEL}")
print(f"  Default Top K: {DEFAULT_TOP_K}")
print(f"  Score Threshold: {DEFAULT_SCORE_THRESHOLD}")
print(f"  API Key: {'✅ Carregada' if QDRANT_API_KEY else '❌ Não encontrada'}")

## 🔗 Conexão com Qdrant

In [None]:
# Inicializar cliente Qdrant
try:
    client = QdrantClient(url=QDRANT_URL, api_key=QDRANT_API_KEY)
    collection_info = client.get_collection(COLLECTION_NAME)
    
    print(f"✅ Conectado ao Qdrant")
    print(f"📊 Collection '{COLLECTION_NAME}': {collection_info.points_count} pontos")
    
    VECTOR_SIZE = collection_info.config.params.vectors.size
    
except Exception as e:
    print(f"❌ Erro ao conectar ao Qdrant: {e}")
    raise

## 🤖 Modelo de Embeddings

In [None]:
# Carregar modelo (singleton)
_model_instance = None

def get_embedding_model():
    """Retorna instância singleton do modelo de embeddings"""
    global _model_instance
    if _model_instance is None:
        print(f"🤖 Carregando modelo {EMBEDDING_MODEL}...")
        _model_instance = SentenceTransformer(EMBEDDING_MODEL)
        print(f"✅ Modelo carregado: {_model_instance.get_sentence_embedding_dimension()} dimensões")
    return _model_instance

def generate_embedding(text: str) -> List[float]:
    """Gera embedding normalizado para um texto"""
    model = get_embedding_model()
    embedding = model.encode(text, normalize_embeddings=True, show_progress_bar=False)
    return embedding.tolist()

# Inicializar modelo
get_embedding_model()
print(f"📏 Modelo pronto para uso")

## 🏷️ Filtros de Metadata

In [None]:
def build_metadata_filter(filters: Optional[Dict[str, Any]] = None) -> Optional[Filter]:
    """Constrói filtros Qdrant a partir de parâmetros de busca"""
    if not filters:
        return None
    
    conditions = []
    
    # Filtros de string exata
    string_fields = ['repo', 'branch', 'relpath', 'source_document', 'lang', 'embed_model_major']
    for field in string_fields:
        if field in filters and filters[field]:
            conditions.append(
                FieldCondition(key=field, match=MatchValue(value=filters[field]))
            )
    
    # Filtro de range de datas
    if 'date_from' in filters or 'date_to' in filters:
        date_range = {}
        if 'date_from' in filters:
            date_range['gte'] = filters['date_from']
        if 'date_to' in filters:
            date_range['lte'] = filters['date_to']
        conditions.append(
            FieldCondition(key='last_updated', range=Range(**date_range))
        )
    
    return Filter(must=conditions) if conditions else None

## 🎯 Função Principal de Busca RAG

In [None]:
def hybrid_search(
    query: str,
    top_k: int = DEFAULT_TOP_K,
    score_threshold: float = DEFAULT_SCORE_THRESHOLD,
    filters: Optional[Dict[str, Any]] = None,
    include_embeddings: bool = False
) -> Dict[str, Any]:
    """Realiza busca híbrida RAG no Qdrant"""
    start_time = time.time()
    
    # Validar parâmetros
    top_k = min(top_k, MAX_TOP_K)
    
    # Cache key
    cache_key = hashlib.md5(
        f"{query}:{top_k}:{score_threshold}:{json.dumps(filters or {}, sort_keys=True)}:{include_embeddings}".encode()
    ).hexdigest()
    
    # Verificar cache
    if cache_key in query_cache:
        cached_result, cached_time = query_cache[cache_key]
        if time.time() - cached_time < CACHE_TTL:
            cached_result['from_cache'] = True
            return cached_result
    
    # Gerar embedding da query
    query_embedding = generate_embedding(query)
    
    # Construir filtros
    search_filter = build_metadata_filter(filters)
    
    # Buscar no Qdrant
    search_results = client.search(
        collection_name=COLLECTION_NAME,
        query_vector=query_embedding,
        query_filter=search_filter,
        limit=top_k,
        score_threshold=score_threshold,
        with_payload=True,
        with_vectors=False
    )
    
    # Formatar resultados
    results = []
    for hit in search_results:
        result = {
            'score': round(hit.score, 4),
            'text': hit.payload.get('text', ''),
            'metadata': {
                'chunk_id': hit.payload.get('chunk_id'),
                'chunk_index': hit.payload.get('chunk_index'),
                'source_document': hit.payload.get('source_document'),
                'repo': hit.payload.get('repo'),
                'branch': hit.payload.get('branch'),
                'commit': hit.payload.get('commit'),
                'last_updated': hit.payload.get('last_updated')
            },
            'point_id': hit.id
        }
        
        # Adicionar highlights
        highlights = []
        query_words = query.lower().split()
        text_lower = result['text'].lower()
        for word in query_words:
            if len(word) > 2 and word in text_lower:
                idx = text_lower.index(word)
                start = max(0, idx - 30)
                end = min(len(result['text']), idx + len(word) + 30)
                highlight = result['text'][start:end]
                if start > 0:
                    highlight = '...' + highlight
                if end < len(result['text']):
                    highlight = highlight + '...'
                highlights.append(highlight)
        
        if highlights:
            result['highlights'] = highlights[:3]
        
        results.append(result)
    
    # Preparar resposta
    response = {
        'query': query,
        'total_results': len(results),
        'results': results,
        'search_metadata': {
            'model': EMBEDDING_MODEL,
            'collection': COLLECTION_NAME,
            'top_k': top_k,
            'score_threshold': score_threshold,
            'filters_applied': filters or {},
            'search_time_ms': round((time.time() - start_time) * 1000, 2),
            'from_cache': False
        }
    }
    
    # Adicionar embeddings se solicitado
    if include_embeddings:
        response['query_embedding'] = query_embedding
        response['query_embedding_info'] = {
            'dimensions': len(query_embedding),
            'model': EMBEDDING_MODEL,
            'normalized': True,
            'magnitude': round(float(np.linalg.norm(query_embedding)), 6)
        }
    
    # Cache da resposta
    query_cache[cache_key] = (response, time.time())
    
    # Limpar cache antigo
    if len(query_cache) > 100:
        sorted_cache = sorted(query_cache.items(), key=lambda x: x[1][1])
        for key, _ in sorted_cache[:50]:
            del query_cache[key]
    
    return response

## 🌐 Endpoint: Busca GET

In [None]:
# GET /api/v1/search
def parse_query_params():
    """Parse query parameters from REQUEST global"""
    try:
        request = REQUEST
        query_string = request.get('query', {})
        
        # Extrair query com fallback seguro
        query = ""
        if 'q' in query_string:
            q_val = query_string['q']
            query = q_val[0] if isinstance(q_val, list) and q_val else str(q_val) if q_val else ""
        
        # Extrair top_k com validação
        top_k = DEFAULT_TOP_K
        if 'top_k' in query_string:
            try:
                tk_val = query_string['top_k']
                top_k = int(tk_val[0] if isinstance(tk_val, list) else tk_val)
                top_k = max(1, min(top_k, MAX_TOP_K))  # Clamp entre 1 e MAX_TOP_K
            except (ValueError, IndexError):
                top_k = DEFAULT_TOP_K
        
        # Extrair score_threshold com validação
        score_threshold = DEFAULT_SCORE_THRESHOLD
        if 'score_threshold' in query_string:
            try:
                st_val = query_string['score_threshold']
                score_threshold = float(st_val[0] if isinstance(st_val, list) else st_val)
                score_threshold = max(0.0, min(score_threshold, 1.0))  # Clamp entre 0 e 1
            except (ValueError, IndexError):
                score_threshold = DEFAULT_SCORE_THRESHOLD
        
        # Extrair filtros com validação
        filters = {}
        for key in ['repo', 'branch', 'relpath', 'source_document', 'lang']:
            if key in query_string:
                try:
                    val = query_string[key]
                    filter_val = val[0] if isinstance(val, list) and val else str(val) if val else ""
                    if filter_val.strip():
                        filters[key] = filter_val.strip()
                except (IndexError, AttributeError):
                    continue
        
        return query, top_k, score_threshold, filters
        
    except (NameError, KeyError, AttributeError) as e:
        print(f"⚠️ Erro no parse de query params: {e}")
        return "Quem é a processa?", DEFAULT_TOP_K, DEFAULT_SCORE_THRESHOLD, {}

# Executar busca
query, top_k, score_threshold, filters = parse_query_params()

if not query:
    response = {
        'error': 'Query parameter "q" is required',
        'example': '/api/v1/search?q=texto+de+busca&top_k=20'
    }
else:
    response = hybrid_search(
        query=query,
        top_k=top_k,
        score_threshold=score_threshold,
        filters=filters if filters else None
    )

print(json.dumps(response, indent=2, ensure_ascii=False))

## 🌐 Endpoint: Busca POST

In [None]:
# POST /api/v1/search
def parse_post_body():
    """Parse JSON body from POST request"""
    try:
        request = REQUEST
        body = request.get('body', '{}')
        if isinstance(body, str):
            return json.loads(body)
        elif isinstance(body, dict):
            return body
        else:
            return {'query': 'Quem é a processa?', 'top_k': DEFAULT_TOP_K}
    except (NameError, json.JSONDecodeError, AttributeError) as e:
        print(f"⚠️ Erro no parse do body: {e}")
        return {'query': 'Quem é a processa?', 'top_k': DEFAULT_TOP_K}

request_data = parse_post_body()

if 'query' not in request_data or not request_data['query'].strip():
    response = {
        'error': 'Field "query" is required in request body',
        'example': {
            'query': 'texto de busca',
            'top_k': 20,
            'score_threshold': 0.7,
            'include_embeddings': True,
            'filters': {'repo': 'nic/documentacao', 'branch': 'main'}
        }
    }
else:
    # Validar e limitar parâmetros
    top_k = max(1, min(request_data.get('top_k', DEFAULT_TOP_K), MAX_TOP_K))
    score_threshold = max(0.0, min(request_data.get('score_threshold', DEFAULT_SCORE_THRESHOLD), 1.0))
    
    response = hybrid_search(
        query=request_data['query'].strip(),
        top_k=top_k,
        score_threshold=score_threshold,
        filters=request_data.get('filters'),
        include_embeddings=request_data.get('include_embeddings', False)
    )

print(json.dumps(response, indent=2, ensure_ascii=False))

## 🔄 Endpoint: Busca por Similaridade

In [None]:
# GET /api/v1/search/similar/<point_id>
def find_similar_documents(point_id: int, top_k: int = DEFAULT_TOP_K, score_threshold: float = 0.8) -> Dict[str, Any]:
    """Encontra documentos similares a um documento existente"""
    start_time = time.time()
    
    try:
        # Buscar o ponto original
        original_points = client.retrieve(
            collection_name=COLLECTION_NAME,
            ids=[point_id],
            with_vectors=True,
            with_payload=True
        )
        
        if not original_points:
            return {'error': f'Point {point_id} not found', 'point_id': point_id}
        
        original = original_points[0]
        
        # Buscar similares usando o vetor do documento
        similar_results = client.search(
            collection_name=COLLECTION_NAME,
            query_vector=original.vector,
            limit=top_k + 1,  # +1 porque o próprio documento será retornado
            score_threshold=score_threshold,
            with_payload=True,
            with_vectors=False
        )
        
        # Filtrar o próprio documento dos resultados
        results = []
        for hit in similar_results:
            if hit.id != point_id:
                results.append({
                    'score': round(hit.score, 4),
                    'text': hit.payload.get('text', '')[:200] + '...',
                    'metadata': {
                        'chunk_id': hit.payload.get('chunk_id'),
                        'source_document': hit.payload.get('source_document'),
                        'chunk_index': hit.payload.get('chunk_index')
                    },
                    'point_id': hit.id
                })
        
        return {
            'original': {
                'point_id': point_id,
                'text': original.payload.get('text', '')[:200] + '...',
                'source_document': original.payload.get('source_document')
            },
            'similar_documents': results[:top_k],
            'total_similar': len(results),
            'search_metadata': {
                'collection': COLLECTION_NAME,
                'score_threshold': score_threshold,
                'search_time_ms': round((time.time() - start_time) * 1000, 2)
            }
        }
        
    except Exception as e:
        return {'error': str(e), 'point_id': point_id}

## 📊 Endpoint: Busca por Metadata

In [None]:
# GET /api/v1/search/metadata
def search_by_metadata(filters: Dict[str, Any], limit: int = 20, offset: int = 0) -> Dict[str, Any]:
    """Busca apenas por filtros de metadata, sem usar vetores"""
    start_time = time.time()
    
    if not filters:
        return {
            'error': 'At least one filter is required',
            'available_filters': ['repo', 'branch', 'relpath', 'source_document', 'lang']
        }
    
    search_filter = build_metadata_filter(filters)
    if not search_filter:
        return {'error': 'Invalid filters provided', 'filters': filters}
    
    # Buscar com scroll (paginação)
    results, next_offset = client.scroll(
        collection_name=COLLECTION_NAME,
        scroll_filter=search_filter,
        limit=limit,
        offset=offset,
        with_payload=True,
        with_vectors=False
    )
    
    # Agrupar por documento
    documents = defaultdict(list)
    for point in results:
        doc_name = point.payload.get('source_document', 'unknown')
        documents[doc_name].append({
            'chunk_index': point.payload.get('chunk_index'),
            'text_preview': point.payload.get('text', '')[:100] + '...',
            'point_id': point.id
        })
    
    return {
        'filters': filters,
        'total_points': len(results),
        'documents': [
            {
                'source_document': doc,
                'chunks_count': len(chunks),
                'chunks': sorted(chunks, key=lambda x: x['chunk_index'])[:3]
            }
            for doc, chunks in documents.items()
        ],
        'pagination': {
            'limit': limit,
            'offset': offset,
            'has_more': next_offset is not None,
            'next_offset': next_offset
        },
        'search_metadata': {
            'collection': COLLECTION_NAME,
            'search_time_ms': round((time.time() - start_time) * 1000, 2)
        }
    }

## 📈 Endpoint: Estatísticas

In [None]:
# GET /api/v1/search/stats
def get_collection_stats() -> Dict[str, Any]:
    """Retorna estatísticas da collection"""
    try:
        collection_info = client.get_collection(COLLECTION_NAME)
        
        # Campos disponíveis
        sample_points = client.scroll(collection_name=COLLECTION_NAME, limit=1, with_payload=True, with_vectors=False)[0]
        available_fields = list(sample_points[0].payload.keys()) if sample_points else []
        
        return {
            'collection': {
                'name': COLLECTION_NAME,
                'points_count': collection_info.points_count,
                'status': collection_info.status,
                'vector_size': collection_info.config.params.vectors.size,
                'distance_metric': 'COSINE'
            },
            'model': {
                'name': EMBEDDING_MODEL,
                'dimensions': VECTOR_SIZE
            },
            'search_config': {
                'default_top_k': DEFAULT_TOP_K,
                'max_top_k': MAX_TOP_K,
                'default_score_threshold': DEFAULT_SCORE_THRESHOLD
            },
            'cache': {
                'entries': len(query_cache),
                'ttl_seconds': CACHE_TTL
            },
            'available_metadata_fields': available_fields,
            'api_version': '1.0.0',
            'timestamp': datetime.now().isoformat() + 'Z'
        }
        
    except Exception as e:
        return {'error': str(e), 'collection': COLLECTION_NAME}

stats = get_collection_stats()
print(json.dumps(stats, indent=2, ensure_ascii=False))

In [None]:
# GET /processa/info
def get_processa_info():
    """Obtém informações sobre a Processa Sistemas usando RAG"""
    
    # Parse query parameter opcional
    try:
        request = REQUEST
        query_params = request.get('args', {})
        custom_query = query_params.get('query', [None])[0] if 'query' in query_params else None
    except (NameError, KeyError, AttributeError):
        custom_query = None
    
    # Query padrão ou customizada
    query = custom_query or "Quem é a Processa Sistemas"
    
    # Buscar usando a função hybrid_search existente
    search_results = hybrid_search(
        query=query,
        top_k=10,
        score_threshold=0.5,
        filters=None,
        include_embeddings=False
    )
    
    # Processar resultados para formato específico da Processa
    if search_results.get('results'):
        results = search_results['results']
        
        # Extrair descrição principal
        description = results[0]['text'] if results else "Informação não disponível"
        if len(description) > 500:
            description = description[:497] + "..."
        
        # Extrair pontos principais
        key_points = []
        seen = set()
        for result in results[:5]:
            sentences = result['text'].split('.')
            for sentence in sentences[:2]:
                sentence = sentence.strip()
                if sentence and len(sentence) > 20 and sentence not in seen:
                    key_points.append(sentence)
                    seen.add(sentence)
                    if len(key_points) >= 5:
                        break
            if len(key_points) >= 5:
                break
        
        # Extrair fontes
        sources = []
        seen_sources = set()
        for result in results[:5]:
            source_name = result['metadata'].get('source_document', 'unknown')
            if source_name not in seen_sources:
                sources.append({
                    "document": source_name,
                    "relevance_score": result['score'],
                    "metadata": result['metadata']
                })
                seen_sources.add(source_name)
        
        response = {
            "status": "success",
            "query": query,
            "company_info": {
                "name": "Processa Sistemas",
                "description": description,
                "key_points": key_points[:5],
                "sources": sources
            },
            "search_metadata": {
                "total_results": search_results['total_results'],
                "top_score": results[0]['score'] if results else 0,
                "collection": COLLECTION_NAME,
                "model": EMBEDDING_MODEL,
                "search_time_ms": search_results['search_metadata']['search_time_ms']
            },
            "timestamp": datetime.now().isoformat()
        }
    else:
        response = {
            "status": "no_results",
            "message": "Nenhuma informação encontrada sobre a Processa",
            "query": query,
            "timestamp": datetime.now().isoformat()
        }
    
    return response

# Executar e retornar resultado
result = get_processa_info()
print(json.dumps(result, indent=2, ensure_ascii=False))

## 🏢 Endpoint: Informações da Processa

`GET /processa/info`