In [1]:
import os
import google.generativeai as genai
from dotenv import load_dotenv
load_dotenv()
import json
import numpy as np

# Initialize embeddings model
genai.configure(api_key=os.getenv("GOOGLE_API_KEY"))

# Cache for knowledge base data and embeddings
_knowledge_base_cache = None
_embeddings_cache = None

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
from pathlib import Path
import json

# Define the path
DB_FILE = Path("../data/mock_knowledge_base.json")

# Load the database
def load_db():
    if not DB_FILE.exists():
        return {}
    with open(DB_FILE, "r") as f:
        return json.load(f)

load_db()

{'https://www.infinitepay.io': 'InfinitePay Cupom de R$ {discount} de desconto ativado ü§ë Aceite as principais bandeiras e carteiras digitais 0 milh√µes de clientes +R$ 0 bilh√µes de economia em taxas 0% das cidades brasileiras InfiniteTap Transforme seu celular em maquininha de cart√£o gr√°tis em menos de 5 minutos. Taxas a partir de: D√©bito 1,37% 1x 3,15% 12x 12,40% Receba suas vendas na hora Android e iOS\xa0com NFC Parcele em at√© 12x Pix taxa Zero Investimento Zero Conta Digital Gratuita com Link de Pagamento Cadastre-se Gr√°tis CPF/CNPJ Maquininha Smart As melhores taxas para parcelar em at√© 12x e receber na hora ou 1 dia √∫til. Sem aluguel ou fidelidade! Taxas para quem fatura at√© 20mil: D√©bito 1,37% 1x 3,15% 12x 12,40% Pix taxa Zero Impress√£o de comprovante Bateria de alta dura√ß√£o Gest√£o de Vendas e Estoque Conta Digital Gratuita com Link de Pagamento Frete Gr√°tis 12x de R$ 16,58 Compre com Desconto CNPJ Link de Pagamento Venda online √† dist√¢ncia de forma segura e 

In [3]:
def load_knowledge_base() -> list[dict]:
    """Load knowledge base data from mock.json file."""
    global _knowledge_base_cache

    if _knowledge_base_cache is not None:
        return _knowledge_base_cache

    notebook_dir = os.getcwd()
    parent_dir = os.path.dirname(notebook_dir)

    mock_data_path = os.path.join(
        parent_dir,
        "data",
        "mock_knowledge_base.json"
    )

    with open(mock_data_path, 'r', encoding='utf-8') as f:
        data = json.load(f)

    _knowledge_base_cache = data
    return _knowledge_base_cache

load_knowledge_base()

{'https://www.infinitepay.io': 'InfinitePay Cupom de R$ {discount} de desconto ativado ü§ë Aceite as principais bandeiras e carteiras digitais 0 milh√µes de clientes +R$ 0 bilh√µes de economia em taxas 0% das cidades brasileiras InfiniteTap Transforme seu celular em maquininha de cart√£o gr√°tis em menos de 5 minutos. Taxas a partir de: D√©bito 1,37% 1x 3,15% 12x 12,40% Receba suas vendas na hora Android e iOS\xa0com NFC Parcele em at√© 12x Pix taxa Zero Investimento Zero Conta Digital Gratuita com Link de Pagamento Cadastre-se Gr√°tis CPF/CNPJ Maquininha Smart As melhores taxas para parcelar em at√© 12x e receber na hora ou 1 dia √∫til. Sem aluguel ou fidelidade! Taxas para quem fatura at√© 20mil: D√©bito 1,37% 1x 3,15% 12x 12,40% Pix taxa Zero Impress√£o de comprovante Bateria de alta dura√ß√£o Gest√£o de Vendas e Estoque Conta Digital Gratuita com Link de Pagamento Frete Gr√°tis 12x de R$ 16,58 Compre com Desconto CNPJ Link de Pagamento Venda online √† dist√¢ncia de forma segura e 

In [None]:
def get_embedding(text: str, model: str = "models/text-embedding-004") -> np.ndarray:
    """Generate embedding for given text using Google's embedding model."""
    result = genai.embed_content(
        model=model,
        content=text,
        task_type="retrieval_document"
    )
    return np.array(result['embedding'])


def compute_embeddings(knowledge_base: list[dict]) -> list[np.ndarray]:
    """Compute embeddings for all content in the knowledge base."""
    global _embeddings_cache

    if _embeddings_cache is not None:
        return _embeddings_cache

    embeddings = []
    for item in knowledge_base:
        content = item.get("content", "")
        title = item.get("title", "")

        # Combine title and content for better context
        text_to_embed = f"{title}\n\n{content}"
        embedding = get_embedding(text_to_embed)
        embeddings.append(embedding)

    _embeddings_cache = embeddings
    return embeddings


def cosine_similarity(vec1: np.ndarray, vec2: np.ndarray) -> float:
    """Calculate cosine similarity between two vectors."""
    dot_product = np.dot(vec1, vec2)
    norm1 = np.linalg.norm(vec1)
    norm2 = np.linalg.norm(vec2)
    return dot_product / (norm1 * norm2)


def query_knowledge_base(query: str, top_k: int = 2) -> str:
    """
    Query the knowledge base using semantic search with embeddings.

    Args:
        query: The user's search query
        top_k: Number of top results to return (default: 2)

    Returns:
        A formatted string containing the most relevant results
    """
    # Load knowledge base and compute embeddings
    knowledge_base = load_knowledge_base()
    kb_embeddings = compute_embeddings(knowledge_base)

    # Get query embedding
    query_embedding = get_embedding(query, model="models/text-embedding-004")

    # Calculate similarities
    similarities = []
    for i, kb_embedding in enumerate(kb_embeddings):
        similarity = cosine_similarity(query_embedding, kb_embedding)
        similarities.append((i, similarity))

    # Sort by similarity (descending)
    similarities.sort(key=lambda x: x[1], reverse=True)

    # Get top-k results
    top_results = similarities[:top_k]

    # Format results
    results = []
    for idx, score in top_results:
        item = knowledge_base[idx]
        results.append(f"""
**{item['title']}** (Relevance: {score:.2f})
URL: {item['url']}

{item['content']}
""")

    return "\n---\n".join(results)

In [None]:
ans = query_knowledge_base("A InfinitePay anuncia meus produtos?")

In [None]:

print(ans)