In [1]:
#!pip install langchain
#!pip install lanchain_core
#!pip install langchain_community
#!pip install pypdf
#!pip install pymupdf
#!pip install sentence_transformers
#!pip install faiss_cpu
#!pip install chromadb

In [2]:
## Document structure
from langchain_core.documents import Document
import os

In [3]:
doc = Document(
    page_content = "This is the main text content i have as reference in creating a rag",
    metadata = {
        "source":"example.txt",
        "pages" :1,
        "author": "Krish Naik",
        "date_created":"2025-11-10"})

In [4]:
## create a simple text file
os.makedirs("..\\python_projects\\Projects_git",exist_ok = True)

In [5]:
os.getcwd()

'c:\\Users\\gunas\\python_projects\\Projects_git'

In [6]:
sample_texts = {"C:\\Users\\gunas\\python_projects\\Projects_git\\Text files\\rag_ref.txt":
                """Ringg AI is a platform providing AI-powered voice assistants designed primarily for business communication automation. It offers no-code tools and APIs to create, deploy, and manage intelligent voice agents that handle high-volume communication tasks such as lead generation, loan collection, last-mile delivery coordination, appointment scheduling, recruitment calls, and customer support. The platform supports multilingual interactions in 20+ languages, provides human-like conversation experiences, and integrates with business tools like Slack, Microsoft Teams, and Discord for seamless workflows.

Key services include:

Automated outbound and inbound calls for tasks like lead qualification, feedback collection, order confirmations, loan follow-ups, and candidate screening.

Embedded voice agents on business websites for real-time customer interaction.

Advanced analytics on call performance and customer engagement.

Scalable infrastructure with high uptime and low latency.

Smart call transfers to human agents maintaining call context.

Ringg AI is used across industries including banking, logistics, healthcare, education, and direct-to-consumer businesses to improve efficiency, increase conversion rates, and enhance customer experience at scale.​

"""}

In [7]:
for filepath, content in sample_texts.items():
    with open (filepath, 'w',encoding = "utf-8") as f:
        f.write(content)
print("Sample text file created!")

Sample text file created!


In [8]:
## Text loader
from langchain_community.document_loaders  import TextLoader
# from langchain_community.document_loaders import TextLoader

loader = TextLoader("C:\\Users\\gunas\\python_projects\\Projects_git\\Text files\\rag_ref.txt", encoding = "utf-8")
document = loader.load()
print(document)


[Document(metadata={'source': 'C:\\Users\\gunas\\python_projects\\Projects_git\\Text files\\rag_ref.txt'}, page_content='Ringg AI is a platform providing AI-powered voice assistants designed primarily for business communication automation. It offers no-code tools and APIs to create, deploy, and manage intelligent voice agents that handle high-volume communication tasks such as lead generation, loan collection, last-mile delivery coordination, appointment scheduling, recruitment calls, and customer support. The platform supports multilingual interactions in 20+ languages, provides human-like conversation experiences, and integrates with business tools like Slack, Microsoft Teams, and Discord for seamless workflows.\n\nKey services include:\n\nAutomated outbound and inbound calls for tasks like lead qualification, feedback collection, order confirmations, loan follow-ups, and candidate screening.\n\nEmbedded voice agents on business websites for real-time customer interaction.\n\nAdvanc

In [9]:
## Directory loader
from langchain_community.document_loaders import DirectoryLoader

## loading all the text files from the directory
dir_loader = DirectoryLoader("C:\\Users\\gunas\\python_projects\\Projects_git\\Text files",
                             glob = "**\\*.txt", # pattern to match text files
                             loader_cls = TextLoader,#loader class to use
                             loader_kwargs = {'encoding':'utf-8'},
                             show_progress = True
                            )

documents = dir_loader.load()
documents

  0%|          | 0/1 [00:00<?, ?it/s]

100%|██████████| 1/1 [00:00<00:00, 503.28it/s]


[Document(metadata={'source': 'C:\\Users\\gunas\\python_projects\\Projects_git\\Text files\\rag_ref.txt'}, page_content='Ringg AI is a platform providing AI-powered voice assistants designed primarily for business communication automation. It offers no-code tools and APIs to create, deploy, and manage intelligent voice agents that handle high-volume communication tasks such as lead generation, loan collection, last-mile delivery coordination, appointment scheduling, recruitment calls, and customer support. The platform supports multilingual interactions in 20+ languages, provides human-like conversation experiences, and integrates with business tools like Slack, Microsoft Teams, and Discord for seamless workflows.\n\nKey services include:\n\nAutomated outbound and inbound calls for tasks like lead qualification, feedback collection, order confirmations, loan follow-ups, and candidate screening.\n\nEmbedded voice agents on business websites for real-time customer interaction.\n\nAdvance

In [10]:
# Ingesting Pdfs
from langchain_community.document_loaders import PyPDFLoader, PyMuPDFLoader

## loading Pdfs
## loading all the text files from the directory
dir_loader = DirectoryLoader("c:\\Users\\gunas\\python_projects\\Projects_git",
                             glob = "**\\*.pdf", # pattern to match text files
                             loader_cls = PyMuPDFLoader,# PDFLoader class to use
                             show_progress = True
                            )

documents = dir_loader.load()
documents

100%|██████████| 2/2 [00:00<00:00,  2.03it/s]


[Document(metadata={'producer': 'Adobe PDF Library 17.0', 'creator': 'Adobe InDesign 20.2 (Macintosh)', 'creationdate': '2025-03-17T13:41:32-06:00', 'source': 'c:\\Users\\gunas\\python_projects\\Projects_git\\PDFs\\whitepaper_emebddings_vectorstores_v2.pdf', 'file_path': 'c:\\Users\\gunas\\python_projects\\Projects_git\\PDFs\\whitepaper_emebddings_vectorstores_v2.pdf', 'total_pages': 64, 'format': 'PDF 1.7', 'title': '', 'author': '', 'subject': '', 'keywords': '', 'moddate': '2025-11-09T20:31:04+05:30', 'trapped': '', 'modDate': "D:20251109203104+05'30'", 'creationDate': "D:20250317134132-06'00'", 'page': 0}, page_content='Embeddings  \n& Vector Stores\nAuthors: Anant Nawalgaria, \nXiaoqi Ren, and Charles Sugnet'),
 Document(metadata={'producer': 'Adobe PDF Library 17.0', 'creator': 'Adobe InDesign 20.2 (Macintosh)', 'creationdate': '2025-03-17T13:41:32-06:00', 'source': 'c:\\Users\\gunas\\python_projects\\Projects_git\\PDFs\\whitepaper_emebddings_vectorstores_v2.pdf', 'file_path': 'c

In [19]:
# Embedding and Vectore store DB
import numpy as np
from sentence_transformers import SentenceTransformer
import chromadb
from chromadb.config import Settings 
import uuid
from typing import List, Dict, Any, Tuple
from sklearn.metrics.pairwise import cosine_similarity

In [16]:
class EmbeddingManager:
    """Handles document embedding generation using SentenceTransformer"""
    
    def __init__(self, model_name: str = "all-MiniLM-L6-v2"):
        """
        Initialize the embedding manager
        
        Args:
            model_name: HuggingFace model name for sentence embeddings
        """
        self.model_name = model_name
        self.model = None
        self._load_model()

    def _load_model(self):
        """Load the SentenceTransformer model"""
        try:
            print(f"Loading embedding model: {self.model_name}")
            self.model = SentenceTransformer(self.model_name)
            print(f"Model loaded successfully. Embedding dimension: {self.model.get_sentence_embedding_dimension()}")
        except Exception as e:
            print(f"Error loading model {self.model_name}: {e}")
            raise

    def generate_embeddings(self, texts: List[str]) -> np.ndarray:
        """
        Generate embeddings for a list of texts
        
        Args:
            texts: List of text strings to embed
            
        Returns:
            numpy array of embeddings with shape (len(texts), embedding_dim)
        """
        if not self.model:
            raise ValueError("Model not loaded")
        
        print(f"Generating embeddings for {len(texts)} texts...")
        embeddings = self.model.encode(texts, show_progress_bar=True)
        print(f"Generated embeddings with shape: {embeddings.shape}")
        return embeddings


## initialize the embedding manager

embedding_manager=EmbeddingManager()
embedding_manager

Loading embedding model: all-MiniLM-L6-v2


modules.json:   0%|          | 0.00/349 [00:00<?, ?B/s]

To support symlinks on Windows, you either need to activate Developer Mode or to run Python as an administrator. In order to activate developer mode, see this article: https://docs.microsoft.com/en-us/windows/apps/get-started/enable-your-device-for-development


config_sentence_transformers.json:   0%|          | 0.00/116 [00:00<?, ?B/s]

README.md: 0.00B [00:00, ?B/s]

sentence_bert_config.json:   0%|          | 0.00/53.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/612 [00:00<?, ?B/s]

Xet Storage is enabled for this repo, but the 'hf_xet' package is not installed. Falling back to regular HTTP download. For better performance, install the package with: `pip install huggingface_hub[hf_xet]` or `pip install hf_xet`


model.safetensors:   0%|          | 0.00/90.9M [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/350 [00:00<?, ?B/s]

vocab.txt: 0.00B [00:00, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

special_tokens_map.json:   0%|          | 0.00/112 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/190 [00:00<?, ?B/s]

Model loaded successfully. Embedding dimension: 384


<__main__.EmbeddingManager at 0x1fb749f5d60>

In [20]:
# VectorStore
class VectorStore:
    """Manages document embeddings in a ChromaDB vector store"""
    
    def __init__(self, collection_name: str = "pdf_documents", persist_directory: str = "../data/vector_store"):
        """
        Initialize the vector store
        
        Args:
            collection_name: Name of the ChromaDB collection
            persist_directory: Directory to persist the vector store
        """
        self.collection_name = collection_name
        self.persist_directory = persist_directory
        self.client = None
        self.collection = None
        self._initialize_store()

    def _initialize_store(self):
        """Initialize ChromaDB client and collection"""
        try:
            # Create persistent ChromaDB client
            os.makedirs(self.persist_directory, exist_ok=True)
            self.client = chromadb.PersistentClient(path=self.persist_directory)
            
            # Get or create collection
            self.collection = self.client.get_or_create_collection(
                name=self.collection_name,
                metadata={"description": "PDF document embeddings for RAG"}
            )
            print(f"Vector store initialized. Collection: {self.collection_name}")
            print(f"Existing documents in collection: {self.collection.count()}")
            
        except Exception as e:
            print(f"Error initializing vector store: {e}")
            raise

    def add_documents(self, documents: List[Any], embeddings: np.ndarray):
        """
        Add documents and their embeddings to the vector store
        
        Args:
            documents: List of LangChain documents
            embeddings: Corresponding embeddings for the documents
        """
        if len(documents) != len(embeddings):
            raise ValueError("Number of documents must match number of embeddings")
        
        print(f"Adding {len(documents)} documents to vector store...")
        
        # Prepare data for ChromaDB
        ids = []
        metadatas = []
        documents_text = []
        embeddings_list = []
        
        for i, (doc, embedding) in enumerate(zip(documents, embeddings)):
            # Generate unique ID
            doc_id = f"doc_{uuid.uuid4().hex[:8]}_{i}"
            ids.append(doc_id)
            
            # Prepare metadata
            metadata = dict(doc.metadata)
            metadata['doc_index'] = i
            metadata['content_length'] = len(doc.page_content)
            metadatas.append(metadata)
            
            # Document content
            documents_text.append(doc.page_content)
            
            # Embedding
            embeddings_list.append(embedding.tolist())
        
        # Add to collection
        try:
            self.collection.add(
                ids=ids,
                embeddings=embeddings_list,
                metadatas=metadatas,
                documents=documents_text
            )
            print(f"Successfully added {len(documents)} documents to vector store")
            print(f"Total documents in collection: {self.collection.count()}")
            
        except Exception as e:
            print(f"Error adding documents to vector store: {e}")
            raise

vectorstore=VectorStore()
vectorstore
    

Vector store initialized. Collection: pdf_documents
Existing documents in collection: 0


<__main__.VectorStore at 0x1fb74b2caa0>

In [None]:
chunks

In [None]:
### Convert the text to embeddings
texts=[doc.page_content for doc in chunks]

## Generate the Embeddings

embeddings=embedding_manager.generate_embeddings(texts)

##store int he vector dtaabase
vectorstore.add_documents(chunks,embeddings)

In [None]:
# Retriever pipeline from vectorstore
class RAGRetriever:
    """Handles query-based retrieval from the vector store"""
    
    def __init__(self, vector_store: VectorStore, embedding_manager: EmbeddingManager):
        """
        Initialize the retriever
        
        Args:
            vector_store: Vector store containing document embeddings
            embedding_manager: Manager for generating query embeddings
        """
        self.vector_store = vector_store
        self.embedding_manager = embedding_manager

    def retrieve(self, query: str, top_k: int = 5, score_threshold: float = 0.0) -> List[Dict[str, Any]]:
        """
        Retrieve relevant documents for a query
        
        Args:
            query: The search query
            top_k: Number of top results to return
            score_threshold: Minimum similarity score threshold
            
        Returns:
            List of dictionaries containing retrieved documents and metadata
        """
        print(f"Retrieving documents for query: '{query}'")
        print(f"Top K: {top_k}, Score threshold: {score_threshold}")
        
        # Generate query embedding
        query_embedding = self.embedding_manager.generate_embeddings([query])[0]
        
        # Search in vector store
        try:
            results = self.vector_store.collection.query(
                query_embeddings=[query_embedding.tolist()],
                n_results=top_k
            )
            
            # Process results
            retrieved_docs = []
            
            if results['documents'] and results['documents'][0]:
                documents = results['documents'][0]
                metadatas = results['metadatas'][0]
                distances = results['distances'][0]
                ids = results['ids'][0]
                
                for i, (doc_id, document, metadata, distance) in enumerate(zip(ids, documents, metadatas, distances)):
                    # Convert distance to similarity score (ChromaDB uses cosine distance)
                    similarity_score = 1 - distance
                    
                    if similarity_score >= score_threshold:
                        retrieved_docs.append({
                            'id': doc_id,
                            'content': document,
                            'metadata': metadata,
                            'similarity_score': similarity_score,
                            'distance': distance,
                            'rank': i + 1
                        })
                
                print(f"Retrieved {len(retrieved_docs)} documents (after filtering)")
            else:
                print("No documents found")
            
            return retrieved_docs
            
        except Exception as e:
            print(f"Error during retrieval: {e}")
            return []

rag_retriever=RAGRetriever(vectorstore,embedding_manager)

In [None]:
rag_retriever

In [None]:
rag_retriever.retrieve("What is attention is all you need")

In [None]:
# Rag pipeline Vector DB to LLM output generation
import os
from dotenv import load_dotenv
load_dotenv()

print(os.getenv("GROQ_API_KEY"))

In [None]:
from langchain_groq import ChatGroq
from langchain.prompts import PromptTemplate
from langchain.schema import HumanMessage, SystemMessage

In [None]:
class GroqLLM:
    def __init__(self, model_name: str = "gemma2-9b-it", api_key: str =None):
        """
        Initialize Groq LLM
        
        Args:
            model_name: Groq model name (qwen2-72b-instruct, llama3-70b-8192, etc.)
            api_key: Groq API key (or set GROQ_API_KEY environment variable)
        """
        self.model_name = model_name
        self.api_key = api_key or os.environ.get("GROQ_API_KEY")
        
        if not self.api_key:
            raise ValueError("Groq API key is required. Set GROQ_API_KEY environment variable or pass api_key parameter.")
        
        self.llm = ChatGroq(
            groq_api_key=self.api_key,
            model_name=self.model_name,
            temperature=0.1,
            max_tokens=1024
        )
        
        print(f"Initialized Groq LLM with model: {self.model_name}")

    def generate_response(self, query: str, context: str, max_length: int = 500) -> str:
        """
        Generate response using retrieved context
        
        Args:
            query: User question
            context: Retrieved document context
            max_length: Maximum response length
            
        Returns:
            Generated response string
        """
        
        # Create prompt template
        prompt_template = PromptTemplate(
            input_variables=["context", "question"],
            template="""You are a helpful AI assistant. Use the following context to answer the question accurately and concisely.

Context:
{context}

Question: {question}

Answer: Provide a clear and informative answer based on the context above. If the context doesn't contain enough information to answer the question, say so."""
        )
        
        # Format the prompt
        formatted_prompt = prompt_template.format(context=context, question=query)
        
        try:
            # Generate response
            messages = [HumanMessage(content=formatted_prompt)]
            response = self.llm.invoke(messages)
            return response.content
            
        except Exception as e:
            return f"Error generating response: {str(e)}"
        
    def generate_response_simple(self, query: str, context: str) -> str:
        """
        Simple response generation without complex prompting
        
        Args:
            query: User question
            context: Retrieved context
            
        Returns:
            Generated response
        """
        simple_prompt = f"""Based on this context: {context}

Question: {query}

Answer:"""
        
        try:
            messages = [HumanMessage(content=simple_prompt)]
            response = self.llm.invoke(messages)
            return response.content
        except Exception as e:
            return f"Error: {str(e)}"

In [None]:
# Initialize Groq LLM (you'll need to set GROQ_API_KEY environment variable)
try:
    groq_llm = GroqLLM(api_key=os.getenv("GROQ_API_KEY"))
    print("Groq LLM initialized successfully!")
except ValueError as e:
    print(f"Warning: {e}")
    print("Please set your GROQ_API_KEY environment variable to use the LLM.")
    groq_llm = None

In [None]:
### get the context from the retriever and pass it to the LLM

rag_retriever.retrieve("Unified Multi-task Learning Framework")

In [None]:
# Integrating Vectordb Context pipeline with LLM output

### Simple RAG pipeline with Groq LLM
from langchain_groq import ChatGroq
import os
from dotenv import load_dotenv
load_dotenv()

### Initialize the Groq LLM (set your GROQ_API_KEY in environment)
groq_api_key = os.getenv("GROQ_API_KEY")

llm=ChatGroq(groq_api_key=groq_api_key,model_name="gemma2-9b-it",temperature=0.1,max_tokens=1024)

## 2. Simple RAG function: retrieve context + generate response
def rag_simple(query,retriever,llm,top_k=3):
    ## retriever the context
    results=retriever.retrieve(query,top_k=top_k)
    context="\n\n".join([doc['content'] for doc in results]) if results else ""
    if not context:
        return "No relevant context found to answer the question."
    
    ## generate the answwer using GROQ LLM
    prompt=f"""Use the following context to answer the question concisely.
        Context:
        {context}

        Question: {query}

        Answer:"""
    
    response=llm.invoke([prompt.format(context=context,query=query)])
    return response.content

In [None]:
answer=rag_simple("What is attention mechanism?",rag_retriever,llm)
print(answer)

In [None]:
# Enhanced rag pipeline features

# --- Enhanced RAG Pipeline Features ---
def rag_advanced(query, retriever, llm, top_k=5, min_score=0.2, return_context=False):
    """
    RAG pipeline with extra features:
    - Returns answer, sources, confidence score, and optionally full context.
    """
    results = retriever.retrieve(query, top_k=top_k, score_threshold=min_score)
    if not results:
        return {'answer': 'No relevant context found.', 'sources': [], 'confidence': 0.0, 'context': ''}
    
    # Prepare context and sources
    context = "\n\n".join([doc['content'] for doc in results])
    sources = [{
        'source': doc['metadata'].get('source_file', doc['metadata'].get('source', 'unknown')),
        'page': doc['metadata'].get('page', 'unknown'),
        'score': doc['similarity_score'],
        'preview': doc['content'][:300] + '...'
    } for doc in results]
    confidence = max([doc['similarity_score'] for doc in results])
    
    # Generate answer
    prompt = f"""Use the following context to answer the question concisely.\nContext:\n{context}\n\nQuestion: {query}\n\nAnswer:"""
    response = llm.invoke([prompt.format(context=context, query=query)])
    
    output = {
        'answer': response.content,
        'sources': sources,
        'confidence': confidence
    }
    if return_context:
        output['context'] = context
    return output

# Example usage:
result = rag_advanced("Hard Negative Mining Technqiues", rag_retriever, llm, top_k=3, min_score=0.1, return_context=True)
print("Answer:", result['answer'])
print("Sources:", result['sources'])
print("Confidence:", result['confidence'])
print("Context Preview:", result['context'][:300])

In [None]:
# --- Advanced RAG Pipeline: Streaming, Citations, History, Summarization ---
from typing import List, Dict, Any
import time

class AdvancedRAGPipeline:
    def __init__(self, retriever, llm):
        self.retriever = retriever
        self.llm = llm
        self.history = []  # Store query history

    def query(self, question: str, top_k: int = 5, min_score: float = 0.2, stream: bool = False, summarize: bool = False) -> Dict[str, Any]:
        # Retrieve relevant documents
        results = self.retriever.retrieve(question, top_k=top_k, score_threshold=min_score)
        if not results:
            answer = "No relevant context found."
            sources = []
            context = ""
        else:
            context = "\n\n".join([doc['content'] for doc in results])
            sources = [{
                'source': doc['metadata'].get('source_file', doc['metadata'].get('source', 'unknown')),
                'page': doc['metadata'].get('page', 'unknown'),
                'score': doc['similarity_score'],
                'preview': doc['content'][:120] + '...'
            } for doc in results]
            # Streaming answer simulation
            prompt = f"""Use the following context to answer the question concisely.\nContext:\n{context}\n\nQuestion: {question}\n\nAnswer:"""
            if stream:
                print("Streaming answer:")
                for i in range(0, len(prompt), 80):
                    print(prompt[i:i+80], end='', flush=True)
                    time.sleep(0.05)
                print()
            response = self.llm.invoke([prompt.format(context=context, question=question)])
            answer = response.content

        # Add citations to answer
        citations = [f"[{i+1}] {src['source']} (page {src['page']})" for i, src in enumerate(sources)]
        answer_with_citations = answer + "\n\nCitations:\n" + "\n".join(citations) if citations else answer

        # Optionally summarize answer
        summary = None
        if summarize and answer:
            summary_prompt = f"Summarize the following answer in 2 sentences:\n{answer}"
            summary_resp = self.llm.invoke([summary_prompt])
            summary = summary_resp.content

        # Store query history
        self.history.append({
            'question': question,
            'answer': answer,
            'sources': sources,
            'summary': summary
        })

        return {
            'question': question,
            'answer': answer_with_citations,
            'sources': sources,
            'summary': summary,
            'history': self.history
        }

# Example usage:
adv_rag = AdvancedRAGPipeline(rag_retriever, llm)
result = adv_rag.query("what is attention is all you need", top_k=3, min_score=0.1, stream=True, summarize=True)
print("\nFinal Answer:", result['answer'])
print("Summary:", result['summary'])
print("History:", result['history'][-1])