In [None]:
import os
%pwd

In [5]:
os.chdir("../")

In [None]:
%pwd

In [7]:
from dataclasses import dataclass
from pathlib import Path

In [8]:
@dataclass(frozen=True)
class DataEmbeddingConfig:
    root_dir: Path
    data_path: Path
    model_name: str
    text_column: str

@dataclass(frozen=True)
class VectorStorageConfig:
    data_path: Path
    embedding_dim: int

In [None]:
from dialogue_rag_chatbot.constants import *
from dialogue_rag_chatbot.utils.common import read_yaml, create_directories

In [10]:
class ConfigurationManager:
    def __init__(
        self,
        config_filepath = CONFIG_FILE_PATH,
        params_filepath = PARAMS_FILE_PATH):

        self.config = read_yaml(config_filepath)
        self.params = read_yaml(params_filepath)

        create_directories([self.config.artifacts_root])
    
    def get_data_embedding_config(self) -> DataEmbeddingConfig:
        config = self.config.data_embedding

        create_directories([config.root_dir])
        data_embedding_config = DataEmbeddingConfig(
            root_dir = config.root_dir,
            data_path = config.data_path,
            model_name = config.model_name,
            text_column= config.text_column
        )

        return data_embedding_config

    def get_vector_storage_config(self)-> VectorStorageConfig:

        config = self.config.vector_storage
        vector_storage_config = VectorStorageConfig(
            data_path=config.data_path,
            embedding_dim=config.embedding_dim
        )

        return vector_storage_config
    

In [None]:
from dialogue_rag_chatbot.logging import logger
from sentence_transformers import SentenceTransformer
from datasets import load_from_disk
from typing import List, Dict, Any, Tuple
import numpy as np

  from .autonotebook import tqdm as notebook_tqdm


[2025-09-26 17:33:34,838: INFO: utils: NumExpr defaulting to 16 threads.]


In [12]:
class EmbeddingModel:
    """Granite embedding model using SentenceTransformers"""
    
    def __init__(self, config: DataEmbeddingConfig):
        self.model_name = config.model_name
        try:
            self.model = SentenceTransformer(self.model_name)
            logger.info(f"Granite embedding model '{self.model_name}' loaded successfully")
        except Exception as e:
            logger.error(f"Failed to load model {self.model_name}: {str(e)}")
            raise
    
    def encode(self, texts):
        """Encode texts into embeddings"""
        try:
            # Granite embedding models return 768-dimensional vectors
            embeddings = self.model.encode(texts, convert_to_numpy=True)
            logger.info(f"Encoded {len(texts)} texts into embeddings of shape {embeddings.shape}")
            return embeddings
        except Exception as e:
            logger.error(f"Error encoding texts: {str(e)}")
            raise


class VectorStorage:
    """Store and retrieve document embeddings"""
    
    def __init__(self, config: VectorStorageConfig):
        self.config = config
        self.embedding_dim = self.config.embedding_dim
        self.embeddings = []
        self.documents = []
        logger.info(f"VectorStore initialized with embedding_dim={self.embedding_dim}")
    
    def add_documents(self, documents: List[Dict[str, Any]], embeddings: np.ndarray):
        """Add documents and their embeddings to the store"""
        if embeddings.shape[1] != self.embedding_dim:
            raise ValueError(f"Embedding dimension mismatch: expected {self.embedding_dim}, got {embeddings.shape[1]}")
        
        self.embeddings.extend(embeddings.tolist())
        self.documents.extend(documents)
        logger.info(f"Added {len(documents)} documents to vector store")
    
    def similarity_search(self, query_embedding: np.ndarray, top_k: int = 5) -> List[Tuple[Dict[str, Any], float]]:
        """Perform similarity search"""
        if not self.embeddings:
            return []
        
        query_embedding = query_embedding.flatten()

        embeddings_matrix = np.array(self.embeddings)
        similarities = np.dot(embeddings_matrix, query_embedding.T) / (
            np.linalg.norm(embeddings_matrix, axis=1) * np.linalg.norm(query_embedding) + 1e-10
        )
        
        # top_indices = np.argsort(similarities)[-top_k:][::-1]
        
        top_indices = np.argpartition(-similarities, top_k)[:top_k]
        top_indices = top_indices[np.argsort(-similarities[top_indices])]

        results = [(self.documents[idx], float(similarities[idx])) for idx in top_indices]
        logger.info(f"Retrieved {len(results)} documents from vector store")
        return results

In [None]:
class Retriever:
    """Retrieve relevant documents based on query"""
    
    def __init__(self, vector_store: VectorStorage, embedding_model: EmbeddingModel):
        self.vector_store = vector_store
        self.embedding_model = embedding_model
        logger.info("Retriever initialized")
    
    def retrieve(self, query: str, top_k: int = 5) -> List[Dict[str, Any]]:
        """
        Retrieve relevant documents for query
        
        Args:
            query: User query
            top_k: Number of documents to retrieve
            
        Returns:
            List of relevant documents
        """
        try:
            # Encode query
            query_embedding = self.embedding_model.encode([query])[0]
            
            # Search similar documents
            results = self.vector_store.similarity_search(query_embedding, top_k)
            
            # Extract documents
            documents = [doc for doc, score in results]
            
            logger.info(f"Retrieved {len(documents)} documents for query")
            return documents
            
        except Exception as e:
            logger.error(f"Error in retrieval: {str(e)}")
            return []