```
Conflicting Policy RAG System
A RAG pipeline that handles temporal conflicts, noise filtering, and source attribution.
```


In [None]:
import os
import re
from pathlib import Path
from datetime import datetime
from typing import List, Dict, Optional
from enum import Enum
from pydantic import BaseModel, Field, field_validator

from google import genai
from langchain_core.documents import Document
from langchain_google_genai import GoogleGenerativeAIEmbeddings
from langchain_community.vectorstores import Chroma

  from .autonotebook import tqdm as notebook_tqdm


In [None]:
# PYDANTIC MODELS FOR TYPE SAFETY

class DocType(str, Enum):
    "Document Type Enumeration"
    POLICY = "policy"
    MENU = 'menu'
    MEMO = 'memo'
    GENERAL = 'general'

class PolicyMetaData(BaseModel):
    "Type-safe Meta Data for Documents"
    source: str = Field(description="FileName of the document")
    doc_type: DocType = Field(description="Type of document: policy, menu, memo, general")
    effective_date: str = Field(description="ISO format date (YYYY-MM-dd) when the policy became effective")
    version: int = Field(default = 0, description="Version of the document if available otherwise 1")
    year: int = Field(description="Year extracted from effective-date")

    @field_validator('effective_date', mode='after')
    @classmethod
    def validate_date(cls, value: str) -> str:
        """Validate ISO FORMAT date"""
        try:
            datetime.fromisoformat(value)
            return value
        except ValueError:
            raise ValueError(f"Must be ISO format date (YYYY-MM-DD), got: {value}")

In [3]:
class PolicyAnswer(BaseModel):
    """
    Structured LLM output with enforced citations
    """

    answer: str = Field(description="Direct answer to the employee's question")
    reasoning: str = Field(description="Brief explanation for choosing this answer")
    cited_sources: List[str] | None = Field(description="Exact filenames of documents used as sources")
    policy_allows_remote: bool | None = Field(
        description="Whether the current policy allows remote work (true/false/null if not applicable)"
    )

class QueryIntent(BaseModel):
    """
        Structured LLM output for classifying query intent for better query understanding
    """

    intent: DocType = Field("Type of query: policy, menu, memo, general")
    reasoning: str = Field("Brief reasoning for the query classification ( 1 line )")
    confidence: int = Field("Confidence score between 1 and 5", ge=1, le=5)

In [67]:
# METADATA EXTRACTOR
class DocumentMetadata:
    """
        MetaData Extraction from files
    """

    def __init__(self, filename, content):
        self.filename = filename
        self.content = content

        self.docType = self._classify_doc_type()
        self.effective_date = self._extract_date()
        self.version = self._extract_version_info()


    def _classify_doc_type(self) -> str:
        """
            Classify document based on filename and content keywords.
        """

        filename = self.filename.lower()
        content = self.content.lower()

        if 'policy' in filename:
            return DocType.POLICY.value
        elif 'menu' in filename or 'cafeteria' in filename:
            return DocType.MENU.value
        elif 'memo' in filename:
            return DocType.MEMO.value
        else:
            return DocType.GENERAL.value
        
    def _extract_date(self) -> datetime:
        #searching from filename - easier
        year_match = re.search(r'_(\d{4})\.txt', self.filename)
        if year_match:
            year = int(year_match.group(1))
            return datetime(year, 1, 1)
        
        #searching from content - more complex

        date_patterns = [
            (r'Effective Date:\s*([A-Za-z]+\s+\d{1,2}, \s+\d{4})', "%b %d, %Y"),
            (r'Effective Date:\s*(\d{4}-\d{2}-\d{2})', '%Y-%m-%d'),
        ]

        for pattern, date_format in date_patterns:
            match = re.search(pattern, self.content)
            if match:
                try:
                    date_str = match.group(1)
                    return datetime.strptime(date_str, date_format)
                except ValueError:
                    continue
        
        file_path = Path("knowledge_base", self.filename).resolve()
        doc_date = datetime.fromtimestamp(os.path.getmtime(file_path))
        print(f"Warning: No date in {self.filename}, using file date: {doc_date.strftime('%Y-%m-%d')}")
        return  doc_date
    
    def _extract_version_info(self):
        """Extract version number (v1, v2, v3, etc.)"""
        version_match = re.search(r'_v(\d+)_', self.filename)
        if version_match:
            return int(version_match.group(1))
        return 0
    
    def _to_pydantic(self) -> PolicyMetaData:
        return PolicyMetaData(
            source=self.filename,
            doc_type=self.docType,
            effective_date=self.effective_date.isoformat(),
            version= self.version,
            year=self.effective_date.year
        )
    
    def _to_langchain_document(self) -> Document:
        return Document(
            page_content=self.content,
            metadata=self._to_pydantic().model_dump()
        )

In [None]:
class RagEngine:
    """
        RAG engine with conflict resolution and noise filtering.
        - LangChain vector store (for embeddings only)
        - Pydantic validation
        - Native Gemini API with schema support
    """

    def __init__(self, knowledge_base_path: str = "knowledge_base"):
        self.kb_path = Path(knowledge_base_path)
        self.api_key = os.getenv("GEMINI_API_KEY")

        if not self.api_key:
            raise ValueError("GEMINI_API_KEY not found in environment")
        
        print("Initializing RAG Pipeline...")

        self.client = genai.Client(api_key=self.api_key)

        self.embedding = GoogleGenerativeAIEmbeddings(
            model="gemini-embedding-001",
            google_api_key=self.api_key,
            output_dimensionality=768,
            task_type="retrieval-document"
        )

        self.vectorstore = None
    
    def ingest_documents(self):
        print(f"üìö Ingesting documents from: {self.kb_path}")
        
        if not self.kb_path.exists():
            raise FileNotFoundError(f"Knowledge base not found: {self.kb_path}")
        
        txt_files = list(self.kb_path.glob("*.txt"))
        
        if not txt_files:
            raise FileNotFoundError(f"No .txt files in {self.kb_path}")
        
        print(f"Found {len(txt_files)} files\n")

        all_documents = []  # Changed from all_chunks
        
        for filepath in txt_files:
            print(f"  üìÑ Processing: {filepath.name}")
            
            with open(filepath, 'r', encoding='utf-8') as f:
                content = f.read()
            
            metadata_extractor = DocumentMetadata(filepath.name, content)
            langchain_doc = metadata_extractor._to_langchain_document()
            
            # Display extracted metadata
            meta = metadata_extractor._to_pydantic()
            print(f"      Type: {meta.doc_type}")
            print(f"      Date: {meta.year}")
            print(f"      Version: {meta.version}")
            
            # CHANGED: Add entire document as-is (no chunking!)
            all_documents.append(langchain_doc)
            print()

        print("üî® Creating vector store...")
        self.vectorstore = Chroma.from_documents(
            documents=all_documents,  # Changed from all_chunks
            embedding=self.embedding,
            collection_name="techcorp_docs"  # Changed name
        )
        
        print(f"‚úÖ Indexed {len(all_documents)} documents from {len(txt_files)} files\n")
    
    def _retrieve_documents(self, query: str, k: int = 10) -> List[Document]:
        """Retrieve relevant documents using vector similarity."""
        if not self.vectorstore:
            raise ValueError("Documents not ingested. Call ingest_documents() first.")
        
        # Retrieve more than needed since we'll filter
        results = self.vectorstore.similarity_search(query, k=k)
        print(f"    Retrieved {len(results)} documents")
        
        return results
    
    def _classify_query_intent(self, query: str) -> str:
        """
        Use Gemini to classify query intent and determine which doc types are needed.
        
        Returns:
            One of: 'policy', 'menu', 'memo', 'general'
        """
        prompt = f"""Classify this employee query into ONE category based on what type of document would answer it:

- "policy" - Questions about rules, permissions, procedures, what's allowed/not allowed, work requirements, benefits, HR matters, remote work, time off, company guidelines
- "menu" - Questions about food, cafeteria, meals, dining, lunch, dinner, breakfast
- "memo" - Questions about announcements, updates, communications, notices
- "general" - Unclear or could need multiple document types

Query: {query}

Respond with ONLY the category name (policy, menu, memo, or general). Nothing else."""

        try:
            response = self.client.models.generate_content(
                model="gemini-2.5-flash",
                contents=prompt,
                config={
                        "temperature": 0,
                         "response_mime_type": "application/json",
                        "response_json_schema": QueryIntent.model_json_schema()
                        }
            )
            
            result = QueryIntent.model_validate_json(response.text)
            
            print(f"    Query intent: {result.intent}")
            return result
            
        except Exception as e:
            print(f"    Intent classification failed: {e}, defaulting to 'general'")
            return 'general'
    
    def _filter_documents_by_metadata(self, documents: List[Document], query: str) -> List[Document]:
        """
        Filter documents based on query intent and metadata.
        Returns only the most relevant and up-to-date documents.
        """
        
        # Step 1: Classify query intent
        result = self._classify_query_intent(query)
        intent = result.intent.value
        
        print(f"\n  Intent: {intent}")
        print(f"    Documents before filtering: {len(documents)}")
        
        # Step 2: Filter by document type based on intent
        if intent == 'menu':
            filtered_docs = [d for d in documents if d.metadata.get('doc_type') == 'menu']
            print(f"    Keeping only MENU documents: {len(filtered_docs)}")
            return filtered_docs
        
        elif intent == 'memo':
            filtered_docs = [d for d in documents if d.metadata.get('doc_type') == 'memo']
            print(f"    Keeping only MEMO documents: {len(filtered_docs)}")
            return filtered_docs
        
        elif intent == 'policy':
            policy_docs = [d for d in documents if d.metadata.get('doc_type') == 'policy']
            print(f"    Found {len(policy_docs)} policy documents")
            return self._filter_latest_policy(policy_docs)
        
        else:  # general
            print(f"    General query - filtering all document types")
            return self._filter_latest_policy(documents)

    def _filter_latest_policy(self, documents: List[Document]) -> List[Document]:
        """
        Keep only the latest version of policy documents.
        Non-policy documents pass through unchanged.
        """
        if not documents:
            return []
        
        # Separate policies from other doc types
        policy_docs = [d for d in documents if d.metadata.get('doc_type') == 'policy']
        other_docs = [d for d in documents if d.metadata.get('doc_type') != 'policy']
        
        # If no policies or only one policy, no filtering needed
        if len(policy_docs) <= 1:
            return documents
        
        # Find the latest policy
        print(f"        Multiple policies detected: {len(policy_docs)}")
        
        latest_policy = None
        latest_year = 0
        latest_version = 0
        
        for doc in policy_docs:
            year = doc.metadata.get('year', 0)
            version = doc.metadata.get('version', 0)
            source = doc.metadata.get('source', 'unknown')
            
            print(f"      - {source}: year={year}, version={version}")
            
            # Compare by year first, then version
            if (year > latest_year) or (year == latest_year and version > latest_version):
                latest_policy = doc
                latest_year = year
                latest_version = version
        
        if latest_policy:
            print(f"        Keeping latest: {latest_policy.metadata.get('source')} "
                f"(year: {latest_year}, v{latest_version})")
            return [latest_policy] + other_docs
        
        return documents
    
    def retrieve_relevant_context(self, query: str, k: int = 5) -> List[Document]:
        """
        Main retrieval method:
        1. Retrieve top-k documents by similarity
        2. Filter by document metadata (conflict resolution)
        3. Return only relevant and up-to-date documents
        """
        print(" RETRIEVAL PHASE")
        print("-" * 70)
        
        # Step 1: Vector similarity search
        documents = self._retrieve_documents(query, k=k)
        
        # Step 2: Filter by metadata and get latest versions
        filtered_docs = self._filter_documents_by_metadata(documents, query)
        
        print(f"    Final documents: {len(filtered_docs)}")
        print("-" * 70 + "\n")
        
        return filtered_docs
        
    def _build_context(self, documents: List[Document]) -> str:
        """
        Build context string from filtered documents.
        Each document is already complete (no chunking).
        """
        context_parts = []
        
        for doc in documents:
            meta = doc.metadata
            
            context_parts.append(
                f"=== Document: {meta.get('source', 'unknown')} ===\n"
                f"Type: {meta.get('doc_type', 'unknown')}\n"
                f"Year: {meta.get('year', 'N/A')}\n"
                f"Version: v{meta.get('version', 'N/A')}\n"
                f"Effective Date: {meta.get('effective_date', 'N/A')}\n\n"
                f"Content:\n{doc.page_content}\n"
            )
        
        return "\n".join(context_parts)
    
    def query(self, question: str) -> str:
        """
        Main query method.
        
        Args:
            question: User's question
        
        Returns:
            Answer from Gemini with citations
        """
        print("="*70)
        print(f"QUERY: {question}")
        print("="*70 + "\n")
        
        # Retrieve 5 documents initially (not 9 chunks)
        relevant_docs = self.retrieve_relevant_context(question, k=5)
        
        if not relevant_docs:
            return "No relevant documents found to answer your question."
        
        context = self._build_context(relevant_docs)
        
        answer = self._generate_answer(question, context)

        print("\n   Generated answer")
        return answer
    
    def _generate_answer(self, question: str, context: str) -> str:
        """
        Generate answer using Gemini API.
        """
        prompt = f"""You are a helpful HR assistant for TechCorp Inc.

Answer the employee's question using ONLY the provided documents.

1. ONLY answer if the documents contain relevant information to the question.
2. ALWAYS prioritize the MOST RECENT policy when there are conflicts
3. If an older policy contradicts a newer policy, the NEWER policy wins
4. If the documents DO NOT contain information to answer the question, respond with:
   "answer": "I don't have information about that in the company documents. I can only help with TechCorp policies, menus, and memos.", "cited_sources": []
5. DO NOT make up information or use knowledge outside the provided documents
6. Be direct and concise

Documents:
{context}

EMPLOYEE QUESTION: {question}

ANSWER (with citations):"""
        
        print("Generating answer with Gemini...\n")
        
        # Use your existing Gemini client
        response = self.client.models.generate_content(
            model="gemini-2.5-flash",
            contents=prompt,
            config={
                "response_mime_type": "application/json",
                "response_json_schema": PolicyAnswer.model_json_schema(),
                "temperature": 0.5
            }
        )
        
        return PolicyAnswer.model_validate_json(response.text)


In [91]:
from dotenv import load_dotenv

load_dotenv()

rag = RagEngine(knowledge_base_path="knowledge_base")

Initializing RAG Pipeline...


In [92]:
rag.ingest_documents()

üìö Ingesting documents from: knowledge_base
Found 3 files

  üìÑ Processing: policy_v1_2021.txt
      Type: DocType.POLICY
      Date: 2021
      Version: 1

  üìÑ Processing: policy_v2_2024.txt
      Type: DocType.POLICY
      Date: 2024
      Version: 2

  üìÑ Processing: friday_cafeteria_menu.txt
      Type: DocType.MENU
      Date: 2026
      Version: 0

üî® Creating vector store...
‚úÖ Indexed 3 documents from 3 files



In [93]:
answer = rag.query("Can I work fully remotely this Friday?")

QUERY: Can I work fully remotely this Friday?

üîç RETRIEVAL PHASE
----------------------------------------------------------------------
    Retrieved 3 documents
    Query intent: DocType.POLICY

  üìã Intent: policy
  üìÑ Documents before filtering: 3
  üìë Found 2 policy documents
    üîç Multiple policies detected: 2
      - policy_v2_2024.txt: year=2024, version=2
      - policy_v1_2021.txt: year=2021, version=1
    ‚úÖ Keeping latest: policy_v2_2024.txt (year: 2024, v2)
  ‚úÖ Final documents: 1
----------------------------------------------------------------------

Generating answer with Gemini...


‚úÖ Generated answer


In [94]:
print(answer.model_dump_json(indent = 2))

{
  "answer": "You may be able to work remotely this Friday, as remote work is capped at 1 day per week. However, this must be approved by your manager, and you are expected to be in the office for the remaining 4 days of the week.",
  "reasoning": "The TechCorp Return to Office Mandate (v2, 2024) clearly states that remote work is capped at 1 day per week and requires manager approval. Employees are expected to be in the office 4 days a week, and the 100% remote work policy was revoked.",
  "cited_sources": [
    "policy_v2_2024.txt"
  ],
  "policy_allows_remote": true
}


In [95]:
answer = rag.query("Can I work fully remotely?")

QUERY: Can I work fully remotely?

üîç RETRIEVAL PHASE
----------------------------------------------------------------------
    Retrieved 3 documents
    Query intent: DocType.POLICY

  üìã Intent: policy
  üìÑ Documents before filtering: 3
  üìë Found 2 policy documents
    üîç Multiple policies detected: 2
      - policy_v1_2021.txt: year=2021, version=1
      - policy_v2_2024.txt: year=2024, version=2
    ‚úÖ Keeping latest: policy_v2_2024.txt (year: 2024, v2)
  ‚úÖ Final documents: 1
----------------------------------------------------------------------

Generating answer with Gemini...


‚úÖ Generated answer


In [96]:
print(answer.model_dump_json(indent = 2))

{
  "answer": "No, TechCorp's current policy does not allow for fully remote work. Remote work is capped at 1 day per week and must be approved by a manager. The 100% remote work policy from 2021 has been officially revoked.",
  "reasoning": "The policy_v2_2024.txt document explicitly states that the 100% remote work policy from 2021 is revoked and that remote work is now capped at 1 day per week.",
  "cited_sources": [
    "policy_v2_2024.txt"
  ],
  "policy_allows_remote": false
}


In [97]:
answer = rag.query("What is the menu for Friday?")


QUERY: What is the menu for Friday?

üîç RETRIEVAL PHASE
----------------------------------------------------------------------
    Retrieved 3 documents
    Query intent: DocType.MENU

  üìã Intent: menu
  üìÑ Documents before filtering: 3
  ‚úÖ Keeping only MENU documents: 1
  ‚úÖ Final documents: 1
----------------------------------------------------------------------

Generating answer with Gemini...


‚úÖ Generated answer


In [98]:
print(answer.model_dump_json(indent = 2))

{
  "answer": "The menu for Friday is Fish & Chips (Chef's Special!). Please note that the cafeteria is closed for cleaning on Friday afternoons.",
  "reasoning": "The 'friday_cafeteria_menu.txt' document explicitly states the menu for Friday and a note about cafeteria closure.",
  "cited_sources": [
    "friday_cafeteria_menu.txt"
  ],
  "policy_allows_remote": null
}


In [99]:
answer = rag.query("What is happening in America Today?")

QUERY: What is happening in America Today?

üîç RETRIEVAL PHASE
----------------------------------------------------------------------
    Retrieved 3 documents
    Query intent: DocType.GENERAL

  üìã Intent: general
  üìÑ Documents before filtering: 3
  üîç General query - filtering all document types
    üîç Multiple policies detected: 2
      - policy_v1_2021.txt: year=2021, version=1
      - policy_v2_2024.txt: year=2024, version=2
    ‚úÖ Keeping latest: policy_v2_2024.txt (year: 2024, v2)
  ‚úÖ Final documents: 2
----------------------------------------------------------------------

Generating answer with Gemini...


‚úÖ Generated answer


In [100]:
print(answer.model_dump_json(indent = 2))

{
  "answer": "I don't have information about that in the company documents. I can only help with TechCorp policies, menus, and memos.",
  "reasoning": "The provided documents contain information about TechCorp's return to office policy and cafeteria menu, but no information regarding current events in America.",
  "cited_sources": [],
  "policy_allows_remote": null
}
