In [7]:
from docling.document_converter import DocumentConverter

source = r"C:\Arnab's Projects\Python\JG-INSURANCE\Insurance_DB_V1.xlsx"
converter = DocumentConverter()
result = converter.convert(source)
print(result.document.export_to_markdown())

2025-09-23 23:15:34,390 - INFO - detected formats: [<InputFormat.XLSX: 'xlsx'>]
2025-09-23 23:15:34,609 - INFO - Going to convert document batch...
2025-09-23 23:15:34,610 - INFO - Initializing pipeline for SimplePipeline with options hash 995a146ad601044538e6a923bea22f4e
2025-09-23 23:15:34,611 - INFO - Processing document Insurance_DB_V1.xlsx
2025-09-23 23:15:34,612 - INFO - Processing sheet: Solvency Ratio
2025-09-23 23:15:34,946 - INFO - Processing sheet: Persistency ratio
2025-09-23 23:15:34,959 - INFO - Processing sheet: Total Premium Matrix
2025-09-23 23:15:34,966 - INFO - Processing sheet: Grievances
2025-09-23 23:15:35,059 - INFO - Processing sheet: Claim Settlement Ratio
2025-09-23 23:15:35,063 - INFO - Processing sheet: Death Claim Details
2025-09-23 23:15:36,032 - INFO - Processing sheet: Eligibility Filter
2025-09-23 23:15:36,033 - INFO - Finished converting document Insurance_DB_V1.xlsx in 1.64 sec.


| Insurer                                           | 2022-06-01 00:00:00   | 2022-09-01 00:00:00   | 2022-12-01 00:00:00   | 2023-03-01 00:00:00   | 2023-06-01 00:00:00   | 2023-09-01 00:00:00   | 2023-12-01 00:00:00   | 2024-03-01 00:00:00   |
|---------------------------------------------------|-----------------------|-----------------------|-----------------------|-----------------------|-----------------------|-----------------------|-----------------------|-----------------------|
| Public Sector                                     | None                  | None                  | None                  | None                  | None                  | None                  | None                  | None                  |
| Life Insurance Corporation of India               | 1.89                  | 1.88                  | 1.85                  | 1.87                  | 1.89                  | 1.9                   | 1.93                  | 1.98                  |
| Private Sector

In [None]:
from pathlib import Path
from docling.document_converter import DocumentConverter

# --- CONFIG ---

input_excel = Path("Insurance_DB_V1.xlsx")
output_dir = Path("md_exports")
output_dir.mkdir(exist_ok=True)

# --- INIT ---

converter = DocumentConverter()
result = converter.convert(str(input_excel))
doc = result.document  # DoclingDocument


# --- OPTION 1: One Markdown file for whole Excel ---

full_md = doc.export_to_markdown()
with open(output_dir / "all_sheets.md", "w", encoding="utf-8") as f:
    f.write(full_md)
print(f"✅ Exported all sheets → {output_dir / 'all_sheets.md'}")

# --- OPTION 2: Separate Markdown per sheet (worksheet) ---

# Note: DoclingDocument.sections (or similar) if Excel gets parsed into sections
for idx, section in enumerate(doc.content.sections):
    sheet_name = section.title or f"Sheet{idx + 1}"

    # Export only that section (sub-doc) to markdown
    # Using export_to_markdown with range of elements corresponding to that section
    # If section doesn’t directly map to elements, you may need to filter

    md_text = section.export_to_markdown()  # If section is a DoclingDocument or similar
    # If section is not a full Document type, you may instead do something like:
    # md_text = doc.export_to_markdown(from_element=section.start, to_element=section.end)

    file_path = output_dir / f"{sheet_name}.md"
    with open(file_path, "w", encoding="utf-8") as f:
        f.write(f"# {sheet_name}\n\n")
        f.write(md_text)
    print(f"✅ Exported {sheet_name} → {file_path}")


2025-09-23 23:23:44,457 - INFO - detected formats: [<InputFormat.XLSX: 'xlsx'>]


2025-09-23 23:23:44,683 - INFO - Going to convert document batch...
2025-09-23 23:23:44,684 - INFO - Initializing pipeline for SimplePipeline with options hash 995a146ad601044538e6a923bea22f4e
2025-09-23 23:23:44,685 - INFO - Processing document Insurance_DB_V1.xlsx
2025-09-23 23:23:44,686 - INFO - Processing sheet: Solvency Ratio
2025-09-23 23:23:44,694 - INFO - Processing sheet: Persistency ratio
2025-09-23 23:23:44,729 - INFO - Processing sheet: Total Premium Matrix
2025-09-23 23:23:44,736 - INFO - Processing sheet: Grievances
2025-09-23 23:23:44,799 - INFO - Processing sheet: Claim Settlement Ratio
2025-09-23 23:23:44,801 - INFO - Processing sheet: Death Claim Details
2025-09-23 23:23:45,832 - INFO - Processing sheet: Eligibility Filter
2025-09-23 23:23:45,835 - INFO - Finished converting document Insurance_DB_V1.xlsx in 1.38 sec.


AttributeError: 'DoclingDocument' object has no attribute 'content'

In [3]:
"""
Insurance Eligibility Classification System using LangChain and Ollama
"""

import json
from typing import Dict, Any
from pydantic import BaseModel, Field
from langchain_core.output_parsers import JsonOutputParser
from langchain_core.prompts import PromptTemplate
from langchain_ollama import ChatOllama


# Define the response schema using Pydantic
class InsuranceClassification(BaseModel):
    """Schema for insurance eligibility classification response"""

    classification: str = Field(
        description="Classification result: 'Compliant', 'Non-compliant', or 'No Conclusive Evidence'"
    )
    reason: str = Field(
        description="Brief explanation citing specific rules that led to the classification"
    )


class InsuranceUnderwriter:
    """
    Insurance eligibility classification system using LangChain and OllamaChat
    """

    def __init__(self, model_name: str = "llama3.1", temperature: float = 0.0):
        """
        Initialize the insurance underwriter

        Args:
            model_name: Ollama model name (e.g., 'llama3.1', 'llama3.2', 'qwen2.5')
            temperature: Model temperature for consistency (0.0 for deterministic results)
        """
        self.model_name = model_name
        self.temperature = temperature

        # Initialize the LLM
        self.llm = ChatOllama(
            model=model_name,
            temperature=temperature,
            format="json",  # Force JSON output format
        )

        # Set up the JSON output parser
        self.parser = JsonOutputParser(pydantic_object=InsuranceClassification)

        # Define the system prompt with detailed classification rules
        self.system_prompt = self._create_system_prompt()

        # Create the prompt template
        self.prompt_template = PromptTemplate(
            template="{system_prompt}\\n\\nUser Profile Data:\\n{user_profile}\\n\\n{format_instructions}",
            input_variables=["user_profile"],
            partial_variables={
                "system_prompt": self.system_prompt,
                "format_instructions": self.parser.get_format_instructions(),
            },
        )

        # Create the processing chain
        self.chain = self.prompt_template | self.llm | self.parser

    def _create_system_prompt(self) -> str:
        """Create the detailed system prompt for insurance underwriting"""
        return """You are a professional insurance underwriter. Given a user's profile (age, occupation, health, lifestyle, legal status, income, location, past insurance history, and documents), classify their eligibility for term insurance according to strict compliance rules.

CLASSIFICATION RULES:

1. AGE RULES:
   - Age <18 → Non-compliant
   - Age >65-70 → Non-compliant

2. OCCUPATION RULES:
   - High-risk jobs (miners, chemical/oil workers, combat forces, non-commercial pilots, deep-sea divers, stunt performers, journalists in war zones) → Non-compliant

3. HEALTH RULES:
   - Terminal illness → Non-compliant
   - Uncontrolled chronic conditions → Non-compliant
   - Severe mental health issues → Non-compliant
   - BMI <16 or >40 with illness → Non-compliant

4. LIFESTYLE RULES:
   - Alcohol/drug abuse → Non-compliant
   - Heavy smoking → Non-compliant
   - Extreme adventure sports → Non-compliant
   - Reckless driving → Non-compliant

5. LEGAL/CRIMINAL RULES:
   - Criminal records → Non-compliant
   - Terrorism links → Non-compliant
   - Sanctions → Non-compliant
   - AML flagged → Non-compliant

6. RESIDENCY RULES:
   - Foreign nationals without long-term Indian residency → Non-compliant

7. INCOME/EMPLOYMENT RULES:
   - Students without income → No Conclusive Evidence (needs review)
   - Homemakers without spouse cover → No Conclusive Evidence (needs review)
   - Unemployed → No Conclusive Evidence (needs review)
   - Daily wage workers → No Conclusive Evidence (needs review)

8. PAST INSURANCE HISTORY RULES:
   - Previous rejections → No Conclusive Evidence (needs review)
   - Multiple claim denials → No Conclusive Evidence (needs review)

9. LOCATION/TRAVEL RISK RULES:
   - Living or traveling in conflict areas → Non-compliant
   - Living or traveling in sanctioned areas → Non-compliant

10. OVER-INSURANCE RULES:
    - Cover requested exceeds reasonable multiple of income/assets → No Conclusive Evidence (needs review)

11. DOCUMENTATION RULES:
    - Missing PAN → Non-compliant
    - Missing Aadhaar → Non-compliant
    - Missing address proof → Non-compliant
    - Missing age proof → Non-compliant

CLASSIFICATION CATEGORIES:
1. "Compliant" → Fully eligible for standard term insurance
2. "Non-compliant" → Clearly ineligible based on rules
3. "No Conclusive Evidence" → Insufficient or unclear information; further review required

IMPORTANT INSTRUCTIONS:
- Classify into EXACTLY ONE of the three categories above
- Provide a brief reasoning citing which specific rule(s) led to the classification
- Be thorough in your analysis of all provided information
- If multiple rules apply, cite the most relevant ones
- Maintain strict adherence to the classification rules"""

    def classify_eligibility(self, user_profile: Dict[str, Any]) -> Dict[str, str]:
        """
        Classify insurance eligibility for a user profile

        Args:
            user_profile: Dictionary containing user information

        Returns:
            Dictionary with classification and reason
        """
        try:
            # Convert user profile to structured text
            profile_text = self._format_user_profile(user_profile)

            # Invoke the chain
            result = self.chain.invoke({"user_profile": profile_text})

            # Validate the result
            if (
                not isinstance(result, dict)
                or "classification" not in result
                or "reason" not in result
            ):
                raise ValueError("Invalid response format from model")

            # Ensure classification is one of the valid options
            valid_classifications = [
                "Compliant",
                "Non-compliant",
                "No Conclusive Evidence",
            ]
            if result["classification"] not in valid_classifications:
                raise ValueError(f"Invalid classification: {result['classification']}")

            return result

        except Exception as e:
            return {
                "classification": "No Conclusive Evidence",
                "reason": f"Error during classification: {str(e)}. Manual review required.",
            }

    def _format_user_profile(self, user_profile: Dict[str, Any]) -> str:
        """Format user profile data into readable text"""
        formatted_lines = []

        for key, value in user_profile.items():
            if isinstance(value, (list, dict)):
                value = json.dumps(value, indent=2)
            formatted_lines.append(f"{key.replace('_', ' ').title()}: {value}")

        return "\\n".join(formatted_lines)

    def classify_batch(self, user_profiles: list) -> list:
        """
        Classify multiple user profiles

        Args:
            user_profiles: List of user profile dictionaries

        Returns:
            List of classification results
        """
        results = []
        for i, profile in enumerate(user_profiles):
            try:
                result = self.classify_eligibility(profile)
                result["profile_index"] = i
                results.append(result)
            except Exception as e:
                results.append(
                    {
                        "profile_index": i,
                        "classification": "No Conclusive Evidence",
                        "reason": f"Error processing profile {i}: {str(e)}",
                    }
                )

        return results


def main():
    """Example usage of the Insurance Underwriter system"""

    # Initialize the underwriter
    underwriter = InsuranceUnderwriter(
        model_name="qwen3:4b",  # Change to your preferred Ollama model
        temperature=0.0,
    )

    # Test cases
    test_profiles = [
        # Case 1: Compliant profile
        {
            "age": 35,
            "occupation": "Software Engineer",
            "health_status": "Good health, no chronic conditions",
            "bmi": 24.5,
            "lifestyle": "Non-smoker, moderate alcohol consumption",
            "legal_status": "No criminal record",
            "residency": "Indian citizen with permanent residency",
            "employment_status": "Employed",
            "annual_income": 1200000,
            "coverage_requested": 5000000,
            "documents": ["PAN", "Aadhaar", "Address proof", "Age proof"],
            "insurance_history": "No previous applications",
        },
        # Case 2: Non-compliant - underage
        {
            "age": 16,
            "occupation": "Student",
            "health_status": "Good health",
            "lifestyle": "Non-smoker, no alcohol",
            "legal_status": "No criminal record",
            "residency": "Indian citizen",
            "documents": ["Aadhaar", "School ID"],
        },
        # Case 3: Non-compliant - high-risk occupation
        {
            "age": 28,
            "occupation": "Deep-sea diver",
            "health_status": "Good physical condition",
            "lifestyle": "Non-smoker, moderate alcohol",
            "legal_status": "No criminal record",
            "residency": "Indian citizen",
            "employment_status": "Employed",
            "annual_income": 800000,
            "documents": ["PAN", "Aadhaar", "Address proof", "Age proof"],
        },
        # Case 4: No Conclusive Evidence - missing documents
        {
            "age": 42,
            "occupation": "Teacher",
            "health_status": "Good health",
            "lifestyle": "Non-smoker, occasional alcohol",
            "legal_status": "No criminal record",
            "residency": "Indian citizen",
            "employment_status": "Employed",
            "annual_income": 600000,
            "documents": ["Aadhaar", "Address proof"],  # Missing PAN
        },
    ]

    print("=== Insurance Eligibility Classification System ===\\n")

    # Process each test case
    for i, profile in enumerate(test_profiles, 1):
        print(f"Test Case {i}:")
        print("Profile:", json.dumps(profile, indent=2))

        result = underwriter.classify_eligibility(profile)

        print("\\nClassification Result:")
        print(json.dumps(result, indent=2))
        print("-" * 50)

    # Batch processing example
    print("\\n=== Batch Processing Example ===")
    batch_results = underwriter.classify_batch(test_profiles)

    for result in batch_results:
        profile_idx = result.pop("profile_index")
        print(f"Profile {profile_idx + 1}: {result['classification']}")
        print(f"Reason: {result['reason']}")
        print()


if __name__ == "__main__":
    main()

=== Insurance Eligibility Classification System ===\n
Test Case 1:
Profile: {
  "age": 35,
  "occupation": "Software Engineer",
  "health_status": "Good health, no chronic conditions",
  "bmi": 24.5,
  "lifestyle": "Non-smoker, moderate alcohol consumption",
  "legal_status": "No criminal record",
  "residency": "Indian citizen with permanent residency",
  "employment_status": "Employed",
  "annual_income": 1200000,
  "coverage_requested": 5000000,
  "documents": [
    "PAN",
    "Aadhaar",
    "Address proof",
    "Age proof"
  ],
  "insurance_history": "No previous applications"
}
\nClassification Result:
{
  "classification": "Compliant",
  "reason": "User meets all criteria: Age 35 (within 18-65 range), occupation (software engineer) not high-risk, health status good, BMI 24.5 within healthy range, lifestyle (non-smoker, moderate alcohol), no criminal record, Indian permanent resident, employed with income, complete documents, no insurance history."
}
------------------------------

In [11]:
#!/usr/bin/env python3
"""
Enhanced Insurance Classification System with Tavily Search Fallback
"""

import json
import os
from typing import Dict, Any, Optional, List
from pydantic import BaseModel, Field
from langchain_core.output_parsers import JsonOutputParser
from langchain_core.prompts import PromptTemplate
from langchain_core.runnables import RunnableLambda
from langchain_ollama import ChatOllama
from langchain_community.tools.tavily_search import TavilySearchResults


os.environ["TAVILY_API_KEY"] = "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"  # Set your Tavily API key here or in environment

# Enhanced response schema
class EnhancedInsuranceClassification(BaseModel):
    """Enhanced schema for insurance eligibility classification with search context"""

    classification: str = Field(
        description="Classification result: 'Compliant', 'Non-compliant', or 'No Conclusive Evidence'"
    )
    reason: str = Field(
        description="Brief explanation citing specific rules and external research"
    )
    search_context: Optional[str] = Field(
        default=None, description="Additional context from external search when needed"
    )
    confidence_score: float = Field(
        default=1.0, description="Confidence score from 0.0 to 1.0"
    )


class EnhancedInsuranceUnderwriter:
    """
    Enhanced Insurance eligibility classification system with Tavily search fallback
    """

    def __init__(
        self,
        model_name: str = "llama3.1",
        temperature: float = 0.0,
        tavily_api_key: Optional[str] = None,
    ):
        """
        Initialize the enhanced underwriter with search capabilities

        Args:
            model_name: Ollama model name
            temperature: Model temperature
            tavily_api_key: Tavily API key for web search
        """
        self.model_name = model_name
        self.temperature = temperature

        # Initialize primary LLM
        self.llm = ChatOllama(model=model_name, temperature=temperature, format="json")

        # Initialize search tool
        self.tavily_api_key = tavily_api_key or os.getenv("TAVILY_API_KEY")
        if self.tavily_api_key:
            self.search_tool = TavilySearchResults(
                api_key=self.tavily_api_key, max_results=5, search_depth="advanced"
            )
        else:
            self.search_tool = None
            print("Warning: Tavily API key not provided. Search fallback disabled.")

        # Set up parsers
        self.parser = JsonOutputParser(pydantic_object=EnhancedInsuranceClassification)

        # Create prompt templates
        self.system_prompt = self._create_system_prompt()
        self.search_prompt = self._create_search_prompt()

        # Primary classification chain
        self.primary_prompt = PromptTemplate(
            template="{system_prompt}\n\nUser Profile Data:\n{user_profile}\n\n{format_instructions}",
            input_variables=["user_profile"],
            partial_variables={
                "system_prompt": self.system_prompt,
                "format_instructions": self.parser.get_format_instructions(),
            },
        )

        # Enhanced classification chain with search context
        self.enhanced_prompt = PromptTemplate(
            template="{system_prompt}\n\nUser Profile Data:\n{user_profile}\n\nExternal Research Context:\n{search_context}\n\nBased on both the user profile and external research, provide your classification.\n\n{format_instructions}",
            input_variables=["user_profile", "search_context"],
            partial_variables={
                "system_prompt": self.system_prompt,
                "format_instructions": self.parser.get_format_instructions(),
            },
        )

        # Create chains with fallback mechanism
        self.primary_chain = self.primary_prompt | self.llm | self.parser
        self.enhanced_chain = self.enhanced_prompt | self.llm | self.parser

        # Fallback chain for when all else fails
        self.emergency_fallback = RunnableLambda(self._emergency_classification)

        # Main chain with comprehensive fallbacks
        self.main_chain = self.primary_chain.with_fallbacks([self.emergency_fallback])

    def _create_system_prompt(self) -> str:
        """Create the detailed system prompt"""
        return """You are a professional insurance underwriter with access to external research capabilities. Given a user's profile and potentially external research context, classify their eligibility for term insurance according to strict compliance rules.

CLASSIFICATION RULES:

1. AGE RULES:
   - Age <18 → Non-compliant
   - Age >65-70 → Non-compliant

2. OCCUPATION RULES:
   - High-risk jobs (miners, chemical/oil workers, combat forces, non-commercial pilots, deep-sea divers, stunt performers, journalists in war zones) → Non-compliant

3. HEALTH RULES:
   - Terminal illness → Non-compliant
   - Uncontrolled chronic conditions → Non-compliant
   - Severe mental health issues → Non-compliant
   - BMI <16 or >40 with illness → Non-compliant

4. LIFESTYLE RULES:
   - Alcohol/drug abuse → Non-compliant
   - Heavy smoking → Non-compliant
   - Extreme adventure sports → Non-compliant
   - Reckless driving → Non-compliant

5. LEGAL/CRIMINAL RULES:
   - Criminal records → Non-compliant
   - Terrorism links → Non-compliant
   - Sanctions → Non-compliant
   - AML flagged → Non-compliant

6. RESIDENCY RULES:
   - Foreign nationals without long-term Indian residency → Non-compliant

7. INCOME/EMPLOYMENT RULES:
   - Students without income → No Conclusive Evidence (needs review)
   - Homemakers without spouse cover → No Conclusive Evidence (needs review)
   - Unemployed → No Conclusive Evidence (needs review)
   - Daily wage workers → No Conclusive Evidence (needs review)

8. PAST INSURANCE HISTORY RULES:
   - Previous rejections → No Conclusive Evidence (needs review)
   - Multiple claim denials → No Conclusive Evidence (needs review)

9. LOCATION/TRAVEL RISK RULES:
   - Living or traveling in conflict areas → Non-compliant
   - Living or traveling in sanctioned areas → Non-compliant

10. OVER-INSURANCE RULES:
    - Cover requested exceeds reasonable multiple of income/assets → No Conclusive Evidence (needs review)

11. DOCUMENTATION RULES:
    - Missing PAN → Non-compliant
    - Missing Aadhaar → Non-compliant
    - Missing address proof → Non-compliant
    - Missing age proof → Non-compliant

ENHANCED INSTRUCTIONS:
- When external research context is provided, incorporate it into your decision-making
- Consider industry standards, regulatory changes, and current market conditions
- If uncertain about specific occupations, health conditions, or legal matters, use the research context
- Maintain strict adherence to classification rules while considering additional context
- Provide confidence scores based on available information

CLASSIFICATION CATEGORIES:
1. "Compliant" → Fully eligible for standard term insurance
2. "Non-compliant" → Clearly ineligible based on rules
3. "No Conclusive Evidence" → Insufficient information; further review required

Always provide reasoning that cites specific rules and any external research used."""

    def _create_search_prompt(self) -> str:
        """Create search queries for ambiguous cases"""
        return """Generate relevant search queries for insurance underwriting research about: {query_context}

Focus on:
- Current industry standards and regulations
- Risk assessment guidelines
- Occupation-specific insurance eligibility
- Health condition insurability
- Legal compliance requirements

Return 3-5 specific search queries."""

    def classify_eligibility(self, user_profile: Dict[str, Any]) -> Dict[str, str]:
        """
        Enhanced classification with search fallback
        """
        try:
            # Step 1: Primary classification
            profile_text = self._format_user_profile(user_profile)
            initial_result = self.primary_chain.invoke({"user_profile": profile_text})

            initial_result["classification"] = "No Conclusive Evidence"

            # Step 2: Check if we need additional research
            if (
                initial_result.get("classification") == "No Conclusive Evidence"
                and self.search_tool
                and initial_result.get("confidence_score", 1.0) < 1.0
            ):
                # Perform enhanced classification with search
                enhanced_result = self._classify_with_search(
                    user_profile, initial_result
                )
                return enhanced_result

            return self._format_result(initial_result)

        except Exception as e:
            # Final fallback
            return self._emergency_classification(
                {"user_profile": user_profile, "error": str(e)}
            )

    def _classify_with_search(
        self, user_profile: Dict[str, Any], initial_result: Dict[str, str]
    ) -> Dict[str, str]:
        """
        Perform classification with external search context
        """
        try:
            # Generate search queries based on the unclear aspects
            search_queries = self._generate_search_queries(user_profile, initial_result)

            # Perform searches
            search_context = self._perform_searches(search_queries)

            # Re-classify with search context
            profile_text = self._format_user_profile(user_profile)
            enhanced_result = self.enhanced_chain.invoke(
                {"user_profile": profile_text, "search_context": search_context}
            )

            # Add search context to result
            enhanced_result["search_context"] = (
                "External research conducted on: " + ", ".join(search_queries[:3])
            )

            return self._format_result(enhanced_result)

        except Exception as e:
            # Fallback to initial result with error note
            result = self._format_result(initial_result)
            result["reason"] += f" (Search enhancement failed: {str(e)})"
            return result

    def _generate_search_queries(
        self, user_profile: Dict[str, Any], initial_result: Dict[str, str]
    ) -> List[str]:
        """
        Generate targeted search queries based on unclear profile aspects
        """
        queries = []

        # Occupation-based queries
        if "occupation" in user_profile:
            occupation = user_profile["occupation"]
            queries.append(f"{occupation} insurance eligibility requirements India")
            queries.append(f"{occupation} occupational risk insurance underwriting")

        # Health-based queries
        if "health_status" in user_profile and any(
            keyword in str(user_profile["health_status"]).lower()
            for keyword in ["condition", "illness", "disease"]
        ):
            queries.append(
                f"insurance eligibility {user_profile['health_status']} India"
            )

        # Age-related queries
        if "age" in user_profile:
            age = user_profile["age"]
            if age > 60 or age < 25:
                queries.append(
                    f"term insurance age limit {age} years India requirements"
                )

        # Income-based queries
        if "employment_status" in user_profile:
            employment = user_profile["employment_status"]
            queries.append(f"{employment} insurance eligibility India underwriting")

        # Documentation queries
        if "documents" in user_profile:
            queries.append("term insurance required documents India 2025")

        return queries[:5]  # Limit to 5 queries

    def _perform_searches(self, queries: List[str]) -> str:
        """
        Perform web searches and compile results
        """
        if not self.search_tool:
            return "External search unavailable."

        all_results = []

        for query in queries:
            try:
                results = self.search_tool.run(query)
                if results:
                    all_results.append(f"Query: {query}\nResults: {results[:500]}...")
            except Exception as e:
                all_results.append(f"Query: {query}\nError: {str(e)}")

        return "\n\n".join(all_results)

    def _emergency_classification(self, inputs: Dict[str, Any]) -> Dict[str, str]:
        """
        Emergency fallback when all else fails
        """
        return {
            "classification": "No Conclusive Evidence",
            "reason": "System unable to process application due to technical issues. Manual underwriter review required immediately.",
            "search_context": None,
            "confidence_score": 0.0,
        }

    def _format_user_profile(self, user_profile: Dict[str, Any]) -> str:
        """Format user profile data into readable text"""
        formatted_lines = []
        for key, value in user_profile.items():
            if isinstance(value, (list, dict)):
                value = json.dumps(value, indent=2)
            formatted_lines.append(f"{key.replace('_', ' ').title()}: {value}")
        return "\n".join(formatted_lines)

    def _format_result(self, result: Dict[str, Any]) -> Dict[str, str]:
        """Format result for consistent output"""
        return {
            "classification": result.get("classification", "No Conclusive Evidence"),
            "reason": result.get("reason", "Unable to determine eligibility."),
            "search_context": result.get("search_context"),
            "confidence_score": result.get("confidence_score", 0.5),
        }

    def classify_batch_with_fallback(
        self, user_profiles: List[Dict[str, Any]]
    ) -> List[Dict[str, str]]:
        """
        Batch processing with individual fallback handling
        """
        results = []
        for i, profile in enumerate(user_profiles):
            try:
                result = self.classify_eligibility(profile)
                result["profile_index"] = i
                results.append(result)
            except Exception as e:
                results.append(
                    {
                        "profile_index": i,
                        "classification": "No Conclusive Evidence",
                        "reason": f"Critical error processing profile {i}: {str(e)}. Immediate manual review required.",
                        "search_context": None,
                        "confidence_score": 0.0,
                    }
                )

        return results


# Usage example
def main():
    """Example usage with fallback system"""

    # Initialize with Tavily API key
    underwriter = EnhancedInsuranceUnderwriter(
        model_name="qwen3:4b",
        temperature=0.0,
        tavily_api_key="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",  # Set your API key
    )

    # Test case requiring external search
    ambiguous_profile = {
        "age": 45,
        "occupation": "Cryptocurrency Trader",  # New/ambiguous occupation
        "health_status": "Recently recovered from COVID-19, some lingering fatigue",
        "lifestyle": "Works from home, moderate exercise",
        "legal_status": "No criminal record",
        "residency": "Indian citizen",
        "employment_status": "Self-employed",
        "annual_income": 2500000,
        "coverage_requested": 10000000,
        "documents": ["PAN", "Aadhaar", "Address proof", "Income proof"],
        "insurance_history": "No previous applications",
    }

    result = underwriter.classify_eligibility(ambiguous_profile)
    print("Enhanced Classification Result:")
    print(json.dumps(result, indent=2))


if __name__ == "__main__":
    main()


Enhanced Classification Result:
{
  "classification": "Compliant",
  "reason": "The user is seeking term insurance required documents in India for 2025. The search results show that term insurance requires standard documents like ID proof, address proof, age proof, income documents, recent photos, and nominee details. These documents align with the requirements for term insurance in India as per the latest sources (2025). The Indian insurance market has established clear document requirements for term insurance, and the search results provide specific details for 2025, indicating compliance with current regulations.",
  "search_context": "External research conducted on: Cryptocurrency Trader insurance eligibility requirements India, Cryptocurrency Trader occupational risk insurance underwriting, Self-employed insurance eligibility India underwriting",
  "confidence_score": 0.95
}


In [14]:
#!/usr/bin/env python3
"""
Complete Insurance Assessment System with LangGraph
- Node 1: Insurance Eligibility Classification  
- Node 2: Policy Recommendation (with Tavily search)
- Conditional routing based on eligibility
"""

import json
import os
from typing import Dict, Any, Optional, List, Literal
from pydantic import BaseModel, Field
from langchain_core.output_parsers import JsonOutputParser
from langchain_core.prompts import PromptTemplate
from langchain_ollama import ChatOllama
from langchain_community.tools.tavily_search import TavilySearchResults
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from typing_extensions import TypedDict

os.environ["TAVILY_API_KEY"] = "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"  # Set your Tavily API key here or in environment


# Define the state schema
class InsuranceAssessmentState(TypedDict):
    """State schema for the insurance assessment workflow"""
    user_profile: Dict[str, Any]
    classification_result: Optional[Dict[str, str]]
    policy_recommendations: Optional[Dict[str, Any]]
    search_context: Optional[str]
    error_messages: List[str]
    workflow_status: str

# Pydantic models for structured outputs
class InsuranceClassification(BaseModel):
    """Schema for insurance eligibility classification"""
    classification: str = Field(
        description="Classification result: 'Compliant', 'Non-compliant', or 'No Conclusive Evidence'"
    )
    reason: str = Field(
        description="Brief explanation citing specific rules"
    )
    confidence_score: float = Field(
        default=1.0,
        description="Confidence score from 0.0 to 1.0"
    )

class PolicyRecommendation(BaseModel):
    """Schema for policy recommendations"""
    recommended_policies: List[Dict[str, Any]] = Field(
        description="List of recommended policies with details"
    )
    comparison_summary: str = Field(
        description="Summary comparing the recommended policies"
    )
    best_match: Dict[str, Any] = Field(
        description="The best matching policy for the user"
    )
    research_sources: List[str] = Field(
        default=[],
        description="Sources used for policy research"
    )

class InsuranceAssessmentSystem:
    """
    Complete Insurance Assessment System using LangGraph
    """
    
    def __init__(self, 
                 model_name: str = "llama3.1",
                 temperature: float = 0.0,
                 tavily_api_key: Optional[str] = None):
        """
        Initialize the assessment system
        
        Args:
            model_name: Ollama model name
            temperature: Model temperature
            tavily_api_key: Tavily API key for web search
        """
        self.model_name = model_name
        self.temperature = temperature
        
        # Insurance companies to search
        self.insurance_companies = [
            "ICICI Prudential",
            "HDFC Life", 
            "Axis Max Life",
            "TATA AIA",
            "Bajaj Allianz",
            "Aditya Birla Sun Life",
            "SBI Life"
        ]
        
        # Initialize LLM
        self.llm = ChatOllama(
            model=model_name,
            temperature=temperature,
            format="json"
        )
        
        # Initialize search tool
        self.tavily_api_key = tavily_api_key or os.getenv("TAVILY_API_KEY")
        if self.tavily_api_key:
            self.search_tool = TavilySearchResults(
                api_key=self.tavily_api_key,
                max_results=8,
                search_depth="advanced"
            )
        else:
            self.search_tool = None
            print("Warning: Tavily API key not provided. Policy search disabled.")
        
        # Set up parsers
        self.classification_parser = JsonOutputParser(pydantic_object=InsuranceClassification)
        self.policy_parser = JsonOutputParser(pydantic_object=PolicyRecommendation)
        
        # Create prompt templates
        self._setup_prompts()
        
        # Create the workflow graph
        self.workflow = self._create_workflow()

    def _setup_prompts(self):
        """Setup all prompt templates"""
        
        # Classification prompt
        self.classification_prompt = PromptTemplate(
            template="""You are a professional insurance underwriter. Given a user's profile, classify their eligibility for term insurance according to strict compliance rules.

CLASSIFICATION RULES:
- Age: <18 → Non-compliant; >65–70 → Non-compliant
- Occupation: High-risk jobs (miners, chemical/oil workers, combat forces, non-commercial pilots, deep-sea divers, stunt performers, journalists in war zones) → Non-compliant
- Health: Terminal illness, uncontrolled chronic conditions, severe mental health issues, BMI <16 or >40 with illness → Non-compliant
- Lifestyle: Alcohol/drug abuse, heavy smoking, extreme adventure sports, reckless driving → Non-compliant
- Legal/Criminal: Criminal records, terrorism links, sanctions, AML flagged → Non-compliant
- Residency: Foreign nationals without long-term Indian residency → Non-compliant
- Income/Employment: Students without income, homemakers without spouse cover, unemployed, daily wage workers → No Conclusive Evidence (needs review)
- Past Insurance History: Previous rejections or multiple claim denials → No Conclusive Evidence
- Location/Travel Risk: Living or traveling in conflict or sanctioned areas → Non-compliant
- Over-insurance: Cover requested exceeds reasonable multiple of income/assets → No Conclusive Evidence
- Documentation: Missing PAN, Aadhaar, address proof, or age proof → Non-compliant

CLASSIFICATION CATEGORIES:
1. "Compliant" → Fully eligible for standard term insurance
2. "Non-compliant" → Clearly ineligible based on rules  
3. "No Conclusive Evidence" → Insufficient or unclear information; further review required

User Profile:
{user_profile}

{format_instructions}""",
            input_variables=["user_profile"],
            partial_variables={
                "format_instructions": self.classification_parser.get_format_instructions()
            }
        )
        
        # Policy recommendation prompt
        self.policy_prompt = PromptTemplate(
            template="""You are an expert insurance advisor. Based on the user's profile and external research about term life insurance policies, recommend the best policies from the researched companies.

User Profile:
{user_profile}

External Research Results:
{search_results}

Available Insurance Companies: {companies}

Instructions:
- Analyze the research results to identify the best term life insurance policies
- Consider factors like: premium rates, coverage amounts, claim settlement ratio, features, riders available
- Match policies to user's specific needs (age, income, coverage amount requested)
- Provide detailed comparison focusing on value proposition
- Recommend top 3 policies with clear reasoning
- Identify the single best match for this user's profile

{format_instructions}""",
            input_variables=["user_profile", "search_results", "companies"],
            partial_variables={
                "format_instructions": self.policy_parser.get_format_instructions()
            }
        )

    def _create_workflow(self) -> StateGraph:
        """Create the LangGraph workflow"""
        
        # Create the state graph
        workflow = StateGraph(InsuranceAssessmentState)
        
        # Add nodes
        workflow.add_node("classify_eligibility", self._classify_eligibility_node)
        workflow.add_node("recommend_policies", self._recommend_policies_node)
        workflow.add_node("handle_non_compliant", self._handle_non_compliant_node)
        workflow.add_node("handle_inconclusive", self._handle_inconclusive_node)
        
        # Add edges
        workflow.set_entry_point("classify_eligibility")
        
        # Add conditional edges based on classification result
        workflow.add_conditional_edges(
            "classify_eligibility",
            self._route_after_classification,
            {
                "compliant": "recommend_policies",
                "non_compliant": "handle_non_compliant", 
                "inconclusive": "handle_inconclusive"
            }
        )
        
        # All paths lead to END
        workflow.add_edge("recommend_policies", END)
        workflow.add_edge("handle_non_compliant", END)
        workflow.add_edge("handle_inconclusive", END)
        
        return workflow.compile()

    def _classify_eligibility_node(self, state: InsuranceAssessmentState) -> InsuranceAssessmentState:
        """Node 1: Insurance Eligibility Classification"""
        try:
            # Format user profile
            profile_text = self._format_user_profile(state["user_profile"])
            
            # Create classification chain
            classification_chain = self.classification_prompt | self.llm | self.classification_parser
            
            # Classify eligibility
            result = classification_chain.invoke({"user_profile": profile_text})
            
            # Update state
            state["classification_result"] = {
                "classification": result["classification"],
                "reason": result["reason"],
                "confidence_score": result.get("confidence_score", 1.0)
            }
            state["workflow_status"] = "classification_complete"
            
            return state
            
        except Exception as e:
            state["error_messages"].append(f"Classification error: {str(e)}")
            state["classification_result"] = {
                "classification": "No Conclusive Evidence",
                "reason": f"System error during classification: {str(e)}",
                "confidence_score": 0.0
            }
            state["workflow_status"] = "classification_error"
            return state

    def _recommend_policies_node(self, state: InsuranceAssessmentState) -> InsuranceAssessmentState:
        """Node 2: Policy Recommendation with Tavily Search"""
        try:
            if not self.search_tool:
                state["error_messages"].append("Search tool not available")
                state["policy_recommendations"] = {
                    "error": "Cannot search for policies - Tavily API key not provided"
                }
                return state
            
            # Generate search queries for policy research
            search_queries = self._generate_policy_search_queries(state["user_profile"])
            
            # Perform searches
            search_results = self._perform_policy_searches(search_queries)
            
            # Create policy recommendation chain
            policy_chain = self.policy_prompt | self.llm | self.policy_parser
            
            # Get recommendations
            result = policy_chain.invoke({
                "user_profile": self._format_user_profile(state["user_profile"]),
                "search_results": search_results,
                "companies": ", ".join(self.insurance_companies)
            })
            
            # Update state
            state["policy_recommendations"] = {
                "recommended_policies": result["recommended_policies"],
                "comparison_summary": result["comparison_summary"],
                "best_match": result["best_match"],
                "research_sources": result.get("research_sources", [])
            }
            state["search_context"] = search_results[:1000] + "..." if len(search_results) > 1000 else search_results
            state["workflow_status"] = "recommendations_complete"
            
            return state
            
        except Exception as e:
            state["error_messages"].append(f"Policy recommendation error: {str(e)}")
            state["policy_recommendations"] = {
                "error": f"Unable to generate policy recommendations: {str(e)}"
            }
            state["workflow_status"] = "recommendation_error"
            return state

    def _handle_non_compliant_node(self, state: InsuranceAssessmentState) -> InsuranceAssessmentState:
        """Node: Handle Non-Compliant Cases"""
        state["policy_recommendations"] = {
            "message": "Unfortunately, based on the eligibility assessment, you do not qualify for standard term life insurance at this time.",
            "reason": state["classification_result"]["reason"],
            "suggestions": [
                "Consider addressing the specific compliance issues identified",
                "Consult with an insurance advisor for alternative insurance products",
                "Re-apply after resolving the eligibility concerns"
            ]
        }
        state["workflow_status"] = "non_compliant_handled"
        return state

    def _handle_inconclusive_node(self, state: InsuranceAssessmentState) -> InsuranceAssessmentState:
        """Node: Handle Inconclusive Cases"""
        state["policy_recommendations"] = {
            "message": "Your application requires manual review by an underwriter.",
            "reason": state["classification_result"]["reason"],
            "next_steps": [
                "Submit additional documentation as requested",
                "Schedule a medical examination if required",
                "Provide clarification on flagged items",
                "Wait for manual underwriter assessment"
            ],
            "estimated_review_time": "5-10 business days"
        }
        state["workflow_status"] = "inconclusive_handled"
        return state

    def _route_after_classification(self, state: InsuranceAssessmentState) -> Literal["compliant", "non_compliant", "inconclusive"]:
        """Conditional routing logic based on classification result"""
        classification = state["classification_result"]["classification"]
        
        if classification == "Compliant":
            return "compliant"
        elif classification == "Non-compliant":
            return "non_compliant"
        else:  # "No Conclusive Evidence"
            return "inconclusive"

    def _generate_policy_search_queries(self, user_profile: Dict[str, Any]) -> List[str]:
        """Generate targeted search queries for policy research"""
        queries = []
        
        # Get user specifics
        age = user_profile.get("age", 30)
        coverage = user_profile.get("coverage_requested", 5000000)
        income = user_profile.get("annual_income", 1000000)
        
        # Base queries for each company
        for company in self.insurance_companies:
            queries.append(f"{company} term life insurance plans 2025 India")
            queries.append(f"{company} term insurance premium rates age {age}")
        
        # Coverage-specific queries
        queries.append(f"best term life insurance {coverage} coverage India 2025")
        queries.append(f"term insurance comparison ICICI HDFC Axis TATA Bajaj SBI 2025")
        queries.append(f"lowest premium term insurance India {age} years old")
        queries.append(f"highest claim settlement ratio term insurance India 2025")
        
        return queries

    def _perform_policy_searches(self, queries: List[str]) -> str:
        """Perform web searches for policy information"""
        all_results = []
        
        for query in queries[:15]:  # Limit to prevent API overuse
            try:
                results = self.search_tool.run(query)
                if results:
                    all_results.append(f"Query: {query}\nResults: {results}")
            except Exception as e:
                all_results.append(f"Query: {query}\nError: {str(e)}")
        
        return "\n\n".join(all_results)

    def _format_user_profile(self, user_profile: Dict[str, Any]) -> str:
        """Format user profile data into readable text"""
        formatted_lines = []
        for key, value in user_profile.items():
            if isinstance(value, (list, dict)):
                value = json.dumps(value, indent=2)
            formatted_lines.append(f"{key.replace('_', ' ').title()}: {value}")
        return "\n".join(formatted_lines)

    def assess_insurance_application(self, user_profile: Dict[str, Any]) -> Dict[str, Any]:
        """
        Main method to assess insurance application through the complete workflow
        
        Args:
            user_profile: Dictionary containing user information
            
        Returns:
            Complete assessment results including classification and recommendations
        """
        # Initialize state
        initial_state = {
            "user_profile": user_profile,
            "classification_result": None,
            "policy_recommendations": None,
            "search_context": None,
            "error_messages": [],
            "workflow_status": "started"
        }
        
        # Run the workflow
        final_state = self.workflow.invoke(initial_state)
        
        # Format final response
        return {
            "eligibility_assessment": final_state["classification_result"],
            "policy_recommendations": final_state["policy_recommendations"],
            "workflow_status": final_state["workflow_status"],
            "search_context_available": final_state["search_context"] is not None,
            "errors": final_state["error_messages"] if final_state["error_messages"] else None
        }

# Usage example and test cases
def main():
    """Example usage of the complete assessment system"""
    
    # Initialize the system
    assessment_system = InsuranceAssessmentSystem(
        model_name="qwen3:4b",
        temperature=0.0,
        tavily_api_key="your-tavily-api-key"  # Set your API key
    )
    
    # Test cases
    test_profiles = [
        # Case 1: Compliant user - should get policy recommendations
        {
            "age": 32,
            "occupation": "Software Engineer",
            "health_status": "Excellent health, regular checkups",
            "bmi": 23.5,
            "lifestyle": "Non-smoker, occasional social drinking, regular exercise",
            "legal_status": "No criminal record",
            "residency": "Indian citizen with permanent address in Mumbai",
            "employment_status": "Employed full-time",
            "annual_income": 1800000,
            "coverage_requested": 8000000,
            "documents": ["PAN", "Aadhaar", "Address proof", "Age proof", "Income proof"],
            "insurance_history": "No previous term insurance",
            "family_medical_history": "No hereditary conditions"
        },
        
        # Case 2: Non-compliant user - should get rejection handling
        {
            "age": 16,
            "occupation": "Student",
            "health_status": "Good health",
            "lifestyle": "Non-smoker, no alcohol",
            "legal_status": "No criminal record",
            "residency": "Indian citizen",
            "documents": ["Aadhaar", "School certificate"]
        },
        
        # Case 3: Inconclusive case - should get manual review guidance
        {
            "age": 45,
            "occupation": "Freelance Consultant",
            "health_status": "Recently recovered from surgery, currently healthy",
            "lifestyle": "Non-smoker, moderate alcohol",
            "legal_status": "No criminal record",
            "residency": "Indian citizen",
            "employment_status": "Self-employed",
            "annual_income": 1200000,
            "coverage_requested": 15000000,  # High coverage ratio
            "documents": ["PAN", "Aadhaar", "Address proof"]  # Missing some docs
        }
    ]
    
    print("=== Complete Insurance Assessment System ===\n")
    
    # Process each test case
    for i, profile in enumerate(test_profiles, 1):
        print(f"Test Case {i}:")
        print("Profile:", json.dumps({k: v for k, v in profile.items() if k not in ['documents']}, indent=2))
        print(f"Documents: {profile.get('documents', [])}")
        
        # Run complete assessment
        result = assessment_system.assess_insurance_application(profile)
        
        print("\n=== ASSESSMENT RESULTS ===")
        print("Eligibility:", json.dumps(result["eligibility_assessment"], indent=2))
        print("\nPolicy Recommendations:", json.dumps(result["policy_recommendations"], indent=2))
        print(f"\nWorkflow Status: {result['workflow_status']}")
        print(f"External Search Used: {result['search_context_available']}")
        
        if result["errors"]:
            print(f"Errors: {result['errors']}")
        
        print("=" * 80)

if __name__ == "__main__":
    main()


=== Complete Insurance Assessment System ===

Test Case 1:
Profile: {
  "age": 32,
  "occupation": "Software Engineer",
  "health_status": "Excellent health, regular checkups",
  "bmi": 23.5,
  "lifestyle": "Non-smoker, occasional social drinking, regular exercise",
  "legal_status": "No criminal record",
  "residency": "Indian citizen with permanent address in Mumbai",
  "employment_status": "Employed full-time",
  "annual_income": 1800000,
  "coverage_requested": 8000000,
  "insurance_history": "No previous term insurance",
  "family_medical_history": "No hereditary conditions"
}
Documents: ['PAN', 'Aadhaar', 'Address proof', 'Age proof', 'Income proof']

=== ASSESSMENT RESULTS ===
Eligibility: {
  "classification": "Compliant",
  "reason": "Age (32) within 18-65 range; Occupation (Software Engineer) not high-risk; Health (Excellent, BMI 23.5) meets standards; Lifestyle (Non-smoker, occasional drinking) within limits; Legal status clear; Residency (Indian citizen with permanent addre

In [17]:
import json
import os
import pandas as pd
from sentence_transformers import SentenceTransformer
import faiss
import numpy as np

# Load documents
with open(r"C:\Users\arnab\Downloads\insurance_rag_documents.json", 'r', encoding='utf-8') as f:
    documents = json.load(f)

# Metadata keys to store for retrieval
metadata_keys = [
    "insurer", "metric_type", "sheet_name", "source", "document_type", "type"
]

# Load sentence transformer for embeddings
embedder = SentenceTransformer('all-MiniLM-L6-v2')  # Use a suitable model for production

# Prepare data lists
texts = []
metadata_list = []

for doc in documents:
    texts.append(doc['content'])
    meta = {k: doc.get(k, None) for k in metadata_keys}
    metadata_list.append(meta)

# Embed all texts
embeddings = embedder.encode(texts, show_progress_bar=True)
embeddings = np.array(embeddings).astype('float32')

# Create FAISS index
dim = embeddings.shape[1]
index = faiss.IndexFlatL2(dim)
index.add(embeddings)  # Add vectors

# Store metadata mapping
metadata_df = pd.DataFrame(metadata_list)
metadata_df.to_csv('faiss_insurance_metadata.csv', index=False)

# For persistence, save FAISS index to disk
faiss.write_index(index, 'faiss_insurance_index.idx')

# -------- Retrieval Example --------
def search(query, k=5):
    query_vec = embedder.encode([query]).astype('float32')
    D, I = index.search(query_vec, k)
    results = []
    for idx in I[0]:
        res = {
            "content": texts[idx],
            "metadata": metadata_list[idx]
        }
        results.append(res)
    return results

# Example usage
results = search("LIC solvency ratio 2024", k=3)
for res in results:
    print(res['metadata'])
    print(res['content'][:300])
    print("---")


To support symlinks on Windows, you either need to activate Developer Mode or to run Python as an administrator. In order to activate developer mode, see this article: https://docs.microsoft.com/en-us/windows/apps/get-started/enable-your-device-for-development
Xet Storage is enabled for this repo, but the 'hf_xet' package is not installed. Falling back to regular HTTP download. For better performance, install the package with: `pip install huggingface_hub[hf_xet]` or `pip install hf_xet`
Batches: 100%|██████████| 6/6 [00:11<00:00,  1.97s/it]


{'insurer': None, 'metric_type': 'solvency_ratio_summary', 'sheet_name': 'Solvency Ratio Summary', 'source': 'Insurance_DB_V1.xlsx', 'document_type': 'summary', 'type': 'insurance_data'}
Insurance Solvency Ratios Summary:
The solvency ratio measures an insurance company's ability to meet its long-term obligations. A ratio above 1.5 is typically considered healthy.

Key insights from the data:
- Life Insurance Corporation of India (LIC) shows consistent solvency ratios ranging from 1
---
{'insurer': None, 'metric_type': 'claim_settlement_ratio', 'sheet_name': 'Claim Settlement Ratio', 'source': 'Insurance_DB_V1.xlsx', 'document_type': None, 'type': 'insurance_data'}
Company: Life Insurance Corporation of India (LIC)
Claim Settlement Ratio Data:
- CSR 2020: 96.69%
- CSR 2021: 98.62%
- CSR 2022: 98.74%
- CSR 2023: 98.6%
- CSR 2024: 93.48%

---
{'insurer': 'Future Generali India Life Insurance Company Ltd.', 'metric_type': 'solvency_ratio', 'sheet_name': 'Solvency Ratio', 'source': 'Insura

In [20]:

import json
import os
from typing import List, Dict, Any
from langchain_community.vectorstores import FAISS
from langchain_community.embeddings import HuggingFaceEmbeddings
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.schema import Document
import pandas as pd

class InsuranceRAGIngester:
    """
    LangChain-based ingestion pipeline for insurance documents into FAISS vector database
    """

    def __init__(self, embedding_model: str = "sentence-transformers/all-MiniLM-L6-v2"):
        """
        Initialize the ingester with embedding model

        Args:
            embedding_model: HuggingFace embedding model name
        """
        self.embeddings = HuggingFaceEmbeddings(
            model_name=embedding_model,
            model_kwargs={'device': 'cpu'},  # Change to 'cuda' if GPU available
            encode_kwargs={'normalize_embeddings': True}
        )
        self.text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=1000,
            chunk_overlap=200,
            length_function=len,
            separators=["\n\n", "\n", " ", ""]
        )

    def load_documents(self, json_file_path: str) -> List[Document]:
        """
        Load insurance documents from JSON file and convert to LangChain Documents

        Args:
            json_file_path: Path to the JSON file containing insurance documents

        Returns:
            List of LangChain Document objects with metadata
        """
        with open(json_file_path, 'r', encoding='utf-8') as f:
            raw_data = json.load(f)

        documents = []
        for doc_data in raw_data:
            # Create metadata dictionary
            metadata = {
                "insurer": doc_data.get("insurer", ""),
                "metric_type": doc_data.get("metric_type", ""),
                "sheet_name": doc_data.get("sheet_name", ""),
                "source": doc_data.get("source", ""),
                "document_type": doc_data.get("document_type", ""),
                "type": doc_data.get("type", ""),
                "company": doc_data.get("company", "")
            }

            # Create LangChain Document
            doc = Document(
                page_content=doc_data.get("content", ""),
                metadata=metadata
            )
            documents.append(doc)

        return documents

    def split_documents(self, documents: List[Document]) -> List[Document]:
        """
        Split documents into smaller chunks for better retrieval

        Args:
            documents: List of LangChain Document objects

        Returns:
            List of split Document objects
        """
        return self.text_splitter.split_documents(documents)

    def create_vectorstore(self, documents: List[Document], persist_directory: str = None) -> FAISS:
        """
        Create FAISS vectorstore from documents

        Args:
            documents: List of LangChain Document objects
            persist_directory: Directory to save the vectorstore

        Returns:
            FAISS vectorstore object
        """
        # Create FAISS vectorstore
        vectorstore = FAISS.from_documents(
            documents=documents,
            embedding=self.embeddings
        )

        # Save vectorstore locally if persist_directory is provided
        if persist_directory:
            os.makedirs(persist_directory, exist_ok=True)
            vectorstore.save_local(persist_directory)
            print(f"Vectorstore saved to {persist_directory}")

        return vectorstore

    def ingest_documents(self, json_file_path: str, persist_directory: str = "faiss_insurance_db", 
                        split_documents: bool = True) -> FAISS:
        """
        Complete ingestion pipeline

        Args:
            json_file_path: Path to JSON file with insurance documents
            persist_directory: Directory to save the vectorstore
            split_documents: Whether to split documents into chunks

        Returns:
            FAISS vectorstore object
        """
        print("Loading documents...")
        documents = self.load_documents(json_file_path)
        print(f"Loaded {len(documents)} documents")

        if split_documents:
            print("Splitting documents...")
            documents = self.split_documents(documents)
            print(f"Split into {len(documents)} chunks")

        print("Creating vectorstore...")
        vectorstore = self.create_vectorstore(documents, persist_directory)
        print("Ingestion complete!")

        return vectorstore

class InsuranceRAGRetriever:
    """
    LangChain-based retrieval system for insurance documents
    """

    def __init__(self, vectorstore_path: str = "faiss_insurance_db"):
        """
        Initialize retriever with saved vectorstore

        Args:
            vectorstore_path: Path to saved FAISS vectorstore
        """
        self.embeddings = HuggingFaceEmbeddings(
            model_name="sentence-transformers/all-MiniLM-L6-v2",
            model_kwargs={'device': 'cpu'},
            encode_kwargs={'normalize_embeddings': True}
        )

        # Load existing vectorstore
        self.vectorstore = FAISS.load_local(
            vectorstore_path, 
            self.embeddings,
            allow_dangerous_deserialization=True
        )

    def search(self, query: str, k: int = 5, filter_metadata: Dict[str, Any] = None) -> List[Document]:
        """
        Search for relevant documents

        Args:
            query: Search query
            k: Number of documents to return
            filter_metadata: Optional metadata filters

        Returns:
            List of relevant Document objects
        """
        if filter_metadata:
            # Use metadata filtering if provided
            retriever = self.vectorstore.as_retriever(
                search_type="similarity",
                search_kwargs={"k": k, "filter": filter_metadata}
            )
        else:
            retriever = self.vectorstore.as_retriever(
                search_type="similarity",
                search_kwargs={"k": k}
            )

        return retriever.get_relevant_documents(query)

    def search_with_scores(self, query: str, k: int = 5) -> List[tuple]:
        """
        Search with similarity scores

        Args:
            query: Search query
            k: Number of documents to return

        Returns:
            List of (Document, score) tuples
        """
        return self.vectorstore.similarity_search_with_score(query, k=k)

# Usage Example
def main():
    """
    Example usage of the insurance RAG ingestion and retrieval system
    """

    # Initialize ingester
    ingester = InsuranceRAGIngester()

    # Ingest documents
    vectorstore = ingester.ingest_documents(
        json_file_path=r"C:\Users\arnab\Downloads\insurance_rag_documents.json",
        persist_directory="faiss_insurance_db",
        split_documents=True
    )

    # Initialize retriever
    retriever = InsuranceRAGRetriever("faiss_insurance_db")

    # Example searches
    print("\n=== Example Searches ===")

    # Basic search
    results = retriever.search("LIC solvency ratio 2024", k=3)
    print(f"\nFound {len(results)} results for 'LIC solvency ratio 2024':")
    for i, doc in enumerate(results):
        print(f"Result {i+1}:")
        print(f"Insurer: {doc.metadata.get('insurer', 'N/A')}")
        print(f"Metric: {doc.metadata.get('metric_type', 'N/A')}")
        print(f"Content: {doc.page_content[:200]}...")
        print("-" * 50)

    # Filtered search
    filter_criteria = {"metric_type": "solvency_ratio", "insurer": "Life Insurance Corporation of India"}
    filtered_results = retriever.search(
        "solvency ratio trends", 
        k=2, 
        filter_metadata=filter_criteria
    )
    print(f"\nFiltered search results: {len(filtered_results)}")

    # Search with scores
    scored_results = retriever.search_with_scores("claim settlement ratio HDFC", k=2)
    print(f"\nSearch with scores:")
    for doc, score in scored_results:
        print(f"Score: {score:.4f}")
        print(f"Insurer: {doc.metadata.get('insurer', 'N/A')}")
        print(f"Content: {doc.page_content}...")
        print("-" * 30)

if __name__ == "__main__":
    main()


Loading documents...
Loaded 168 documents
Splitting documents...
Split into 196 chunks
Creating vectorstore...
Vectorstore saved to faiss_insurance_db
Ingestion complete!

=== Example Searches ===

Found 3 results for 'LIC solvency ratio 2024':
Result 1:
Insurer: 
Metric: solvency_ratio_summary
Content: Insurance Solvency Ratios Summary:
The solvency ratio measures an insurance company's ability to meet its long-term obligations. A ratio above 1.5 is typically considered healthy.

Key insights from t...
--------------------------------------------------
Result 2:
Insurer: 
Metric: claim_settlement_ratio
Content: Company: Life Insurance Corporation of India (LIC)
Claim Settlement Ratio Data:
- CSR 2020: 96.69%
- CSR 2021: 98.62%
- CSR 2022: 98.74%
- CSR 2023: 98.6%
- CSR 2024: 93.48%...
--------------------------------------------------
Result 3:
Insurer: Future Generali India Life Insurance Company Ltd.
Metric: solvency_ratio
Content: Insurer: Future Generali India Life Insurance Compa

In [None]:

import json
import os
from typing import List, Dict, Any
from langchain_community.vectorstores import FAISS
from langchain_community.embeddings import HuggingFaceEmbeddings
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.schema import Document
# from langchain_community.llms import Ollama
# from langchain_openai import ChatOpenAI
from langchain.chains import RetrievalQA
from langchain.prompts import PromptTemplate
from langchain.chains.combine_documents import create_stuff_documents_chain
from langchain.chains import create_retrieval_chain
from langchain.callbacks import StdOutCallbackHandler
import pandas as pd

class AIInsuranceAdvisor:
    """
    AI-enabled insurance advisor using LangChain with LLM for intelligent decision making
    """

    def __init__(self, llm_type: str = "ollama", model_name: str = "qwen3:4b", 
                 embedding_model: str = "sentence-transformers/all-MiniLM-L6-v2"):
        """
        Initialize the AI Insurance Advisor

        Args:
            llm_type: Type of LLM to use ('ollama', 'openai', 'huggingface')
            model_name: Name of the model
            embedding_model: HuggingFace embedding model name
        """
        self.embeddings = HuggingFaceEmbeddings(
            model_name=embedding_model,
            model_kwargs={'device': 'cpu'},
            encode_kwargs={'normalize_embeddings': True}
        )

        # Initialize LLM based on type
        if llm_type == "ollama":
            self.llm = Ollama(
                model=model_name,
                temperature=0.1,
                callbacks=[StdOutCallbackHandler()]
            )
        elif llm_type == "openai":
            self.llm = None

        # System prompt for insurance evaluation
        self.system_prompt = """
        You are an expert insurance advisor specialized in term life insurance policy evaluation and recommendation based on robust IRDA data metrics. 

        Using the latest IRDA reports, your task is to identify and recommend the top 3 term insurance policies driven by a transparent, weighted scoring methodology that balances insurer financial strength, claims performance, customer retention, and operational excellence.

        **Scoring Factors & Weights:**
        - Financial Strength (25%): Solvency Ratio - Reflects the insurer's ability to meet long-term obligations. Score ranges from 0 (below 1.5) to 100 (above 2.5).
        - Claims Performance (40%): 
          * Claim Settlement Ratio (20%): Measures overall claims approval reliability
          * Death Claim Settlement Ratio (20%): Focuses on term insurance death claims
        - Customer Retention & Satisfaction (10%): Persistency Ratio - Indicates policyholder renewal rates
        - Operational Excellence (10%): Grievance Redressal & Resolution Rate - Quantifies complaint handling effectiveness

        **Composite Scoring Formula:**
        Total Score = (Solvency Ratio Score × 0.25) + (Claim Settlement Ratio Score × 0.20) + (Death Claim Settlement Ratio Score × 0.20) + (Persistency Ratio Score × 0.10) + (Grievance Resolution Score × 0.10)

        Based on the provided insurance data, analyze each insurer's performance across these metrics and provide:
        1. Top 3 recommended term insurance policies
        2. Detailed scoring breakdown for each recommendation
        3. Justification for each recommendation based on the scoring methodology
        4. Any risk factors or considerations for each recommended policy

        Context: {context}

        Question: {input}

        Provide a comprehensive analysis with specific data points and calculations.
        """

    def load_vectorstore(self, vectorstore_path: str = "faiss_insurance_db"):
        """
        Load existing FAISS vectorstore

        Args:
            vectorstore_path: Path to saved FAISS vectorstore
        """
        try:
            self.vectorstore = FAISS.load_local(
                vectorstore_path,
                self.embeddings,
                allow_dangerous_deserialization=True
            )
            print(f"Vectorstore loaded from {vectorstore_path}")
        except Exception as e:
            print(f"Error loading vectorstore: {e}")
            print("Please run the ingestion process first")
            raise

    def create_retrieval_chain(self):
        """
        Create retrieval chain with custom prompt template
        """
        # Create prompt template
        prompt = PromptTemplate(
            template=self.system_prompt,
            input_variables=["context", "input"]
        )

        # Create document chain
        document_chain = create_stuff_documents_chain(self.llm, prompt)

        # Create retriever
        retriever = self.vectorstore.as_retriever(
            search_type="similarity",
            search_kwargs={"k": 10}  # Retrieve more documents for comprehensive analysis
        )

        # Create retrieval chain
        self.retrieval_chain = create_retrieval_chain(retriever, document_chain)

    def get_insurance_recommendations(self, query: str = None) -> str:
        """
        Get AI-powered insurance recommendations

        Args:
            query: Optional specific query, defaults to general recommendation request

        Returns:
            AI-generated insurance recommendations
        """
        if query is None:
            query = """
            Please analyze all available insurance companies and provide the top 3 term life insurance policy recommendations based on the IRDA scoring methodology. 
            Include detailed scoring breakdowns, financial metrics analysis, and specific reasons for each recommendation.
            Focus on the latest available data for each metric.
            """

        try:
            response = self.retrieval_chain.invoke({"input": query})
            return response["answer"]
        except Exception as e:
            return f"Error generating recommendations: {e}"

    def analyze_specific_insurer(self, insurer_name: str) -> str:
        """
        Analyze a specific insurance company

        Args:
            insurer_name: Name of the insurance company to analyze

        Returns:
            AI-generated analysis of the specific insurer
        """
        query = f"""
        Please provide a detailed analysis of {insurer_name} based on the IRDA scoring methodology.
        Include their performance across all metrics: solvency ratio, claim settlement ratio, 
        death claim settlement ratio, persistency ratio, and grievance resolution.
        Calculate their composite score and provide recommendations.
        """

        try:
            response = self.retrieval_chain.invoke({"input": query})
            return response["answer"]
        except Exception as e:
            return f"Error analyzing {insurer_name}: {e}"

    def compare_insurers(self, insurer_list: List[str]) -> str:
        """
        Compare multiple insurance companies

        Args:
            insurer_list: List of insurance company names to compare

        Returns:
            AI-generated comparison analysis
        """
        insurers_str = ", ".join(insurer_list)
        query = f"""
        Please compare the following insurance companies: {insurers_str}

        Provide a detailed comparison based on the IRDA scoring methodology including:
        1. Side-by-side metric comparison
        2. Composite scores for each insurer
        3. Strengths and weaknesses of each
        4. Ranking and recommendation among these options
        5. Risk assessment for each option
        """

        try:
            response = self.retrieval_chain.invoke({"input": query})
            return response["answer"]
        except Exception as e:
            return f"Error comparing insurers: {e}"

    def get_metric_insights(self, metric_type: str) -> str:
        """
        Get insights about a specific metric across all insurers

        Args:
            metric_type: Type of metric to analyze (solvency_ratio, claim_settlement_ratio, etc.)

        Returns:
            AI-generated insights about the metric
        """
        query = f"""
        Please analyze the {metric_type} performance across all insurance companies.
        Provide insights including:
        1. Industry average and trends
        2. Best and worst performers
        3. What constitutes good vs poor performance for this metric
        4. Impact on overall insurance recommendation
        5. Recent trends and changes in this metric
        """

        try:
            response = self.retrieval_chain.invoke({"input": query})
            return response["answer"]
        except Exception as e:
            return f"Error analyzing {metric_type}: {e}"

class InteractiveInsuranceChat:
    """
    Interactive chat interface for insurance advisory
    """

    def __init__(self, advisor: AIInsuranceAdvisor):
        self.advisor = advisor

    def start_chat(self):
        """
        Start interactive chat session
        """
        print("=== AI Insurance Advisor Chat ===")
        print("Ask me anything about insurance policies, companies, or get recommendations!")
        print("Type 'quit' to exit, 'help' for commands")
        print()

        while True:
            user_input = input("You: ").strip()

            if user_input.lower() == 'quit':
                print("Thank you for using AI Insurance Advisor!")
                break
            elif user_input.lower() == 'help':
                self.show_help()
            elif user_input.lower().startswith('recommend'):
                response = self.advisor.get_insurance_recommendations()
                print(f"\nAI Advisor: {response}\n")
            elif user_input.lower().startswith('analyze'):
                # Extract company name from input
                parts = user_input.split(' ', 1)
                if len(parts) > 1:
                    company = parts[1]
                    response = self.advisor.analyze_specific_insurer(company)
                    print(f"\nAI Advisor: {response}\n")
                else:
                    print("Please specify a company name after 'analyze'")
            elif user_input.lower().startswith('compare'):
                # Extract company names from input
                parts = user_input.split(' ', 1)
                if len(parts) > 1:
                    companies = [c.strip() for c in parts[1].split(',')]
                    response = self.advisor.compare_insurers(companies)
                    print(f"\nAI Advisor: {response}\n")
                else:
                    print("Please specify company names after 'compare' (comma-separated)")
            else:
                # General query
                response = self.advisor.get_insurance_recommendations(user_input)
                print(f"\nAI Advisor: {response}\n")

    def show_help(self):
        """
        Show available commands
        """
        print("""
        Available commands:
        - 'recommend' or 'recommendations' - Get top 3 insurance policy recommendations
        - 'analyze [company name]' - Analyze a specific insurance company
        - 'compare [company1, company2, ...]' - Compare multiple companies
        - 'help' - Show this help message
        - 'quit' - Exit the chat

        You can also ask any general questions about insurance policies!
        """)

# Usage Example
def main():
    """
    Main function to demonstrate the AI Insurance Advisor
    """
    # Initialize AI advisor
    print("Initializing AI Insurance Advisor...")
    advisor = AIInsuranceAdvisor(llm_type="ollama", model_name="Qwen3:4b")

    # Load vectorstore
    try:
        advisor.load_vectorstore("faiss_insurance_db")
    except:
        print("Please run the ingestion process first:")
        print("from insurance_rag_langchain import InsuranceRAGIngester")
        print("ingester = InsuranceRAGIngester()")
        print("ingester.ingest_documents('insurance_rag_documents.json', 'faiss_insurance_db')")
        return

    # Create retrieval chain
    advisor.create_retrieval_chain()

    print("AI Insurance Advisor ready!")
    print()

    # Example usage - Get recommendations
    print("=== Getting Top 3 Insurance Recommendations ===")
    recommendations = advisor.get_insurance_recommendations()
    print(recommendations)
    print()

    # Example usage - Analyze specific insurer
    print("=== Analyzing LIC ===")
    lic_analysis = advisor.analyze_specific_insurer("Life Insurance Corporation of India")
    print(lic_analysis)
    print()

    # Start interactive chat
    chat = InteractiveInsuranceChat(advisor)
    chat.start_chat()

if __name__ == "__main__":
    main()


Initializing AI Insurance Advisor...
Vectorstore loaded from faiss_insurance_db
AI Insurance Advisor ready!

=== Getting Top 3 Insurance Recommendations ===
