In [3]:
import os
import json
import time
from typing import TypedDict, Annotated, List, Dict, Any, Optional
from datetime import datetime
from dataclasses import dataclass
from enum import Enum

from langgraph.graph import StateGraph, END
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_core.messages import HumanMessage, AIMessage
from langchain_core.prompts import ChatPromptTemplate
from tavily import TavilyClient
from pydantic import BaseModel, Field
from dotenv import load_dotenv
load_dotenv()

True

In [4]:
class SearchDepth(Enum):
    BASIC = "basic"
    ADVANCED = "advanced"

@dataclass
class SearchConfig:
    """Configuration for search operations"""
    max_results: int = 5
    search_depth: SearchDepth = SearchDepth.ADVANCED
    enable_verification: bool = True
    verification_threshold: float = 0.7
    min_sources_for_verification: int = 3

class VerificationResult(BaseModel):
    """Information verification results"""
    consistency_score: float = Field(..., ge=0, le=1)
    confidence_level: str  # "high", "medium", "low"
    conflicting_claims: List[str] = []
    supporting_sources: List[str] = []
    verification_notes: List[str] = []
    fact_checks: Dict[str, Any] = {}

class SearchResponse(BaseModel):
    """Search response with verification"""
    query: str
    answer: str
    tavily_answer: Optional[str] = None
    source_count: int
    search_results: Dict[str, Any]
    verification: Optional[VerificationResult] = None
    timestamp: datetime
    duration_ms: int

class AgentState(TypedDict):
    query: Annotated[str, "The user's search query"]
    search_results: Annotated[Dict, "Search results from Tavily"]
    verification_result: Annotated[Optional[VerificationResult], "Verification results"]
    final_answer: Annotated[str, "Final answer to the user"]
    config: Annotated[SearchConfig, "Search configuration"]
    start_time: Annotated[float, "Request start time"]


In [5]:
class InformationVerifier:
    """Verify information consistency across multiple sources"""
    
    def __init__(self, llm):
        self.llm = llm
    
    def verify_information(self, query: str, search_results: Dict, config: SearchConfig) -> VerificationResult:
        """Verify information consistency across sources"""
        
        if not config.enable_verification:
            return VerificationResult(
                consistency_score=0.5,
                confidence_level="unknown",
                verification_notes=["Verification disabled"]
            )
        
        results = search_results.get("results", [])
        if len(results) < config.min_sources_for_verification:
            return VerificationResult(
                consistency_score=0.3,
                confidence_level="low",
                verification_notes=[f"Insufficient sources for verification (found {len(results)}, need {config.min_sources_for_verification})"]
            )
        
        try:
            # Extract key facts from sources
            facts_by_source = self._extract_facts_from_sources(query, results)
            
            # Analyze consistency
            consistency_analysis = self._analyze_consistency(facts_by_source)
            
            # Evaluate source credibility
            credibility_scores = self._evaluate_source_credibility(results)
            
            # Calculate overall consistency score
            overall_score = self._calculate_consistency_score(consistency_analysis, credibility_scores)
            
            # Determine confidence level
            confidence_level = self._determine_confidence_level(overall_score)
            
            return VerificationResult(
                consistency_score=overall_score,
                confidence_level=confidence_level,
                conflicting_claims=consistency_analysis.get("inconsistent_facts", []),
                supporting_sources=[result.get("url", "") for result in results[:3]],
                verification_notes=self._generate_verification_notes(consistency_analysis, credibility_scores),
                fact_checks=consistency_analysis
            )
            
        except Exception as e:
            return VerificationResult(
                consistency_score=0.4,
                confidence_level="low",
                verification_notes=[f"Verification failed: {str(e)}"]
            )
    
    def _extract_facts_from_sources(self, query: str, results: List[Dict]) -> Dict[str, List[str]]:
        """Extract key facts from each source"""
        
        fact_extraction_prompt = ChatPromptTemplate.from_messages([
            ("system", """Extract key factual claims from the text that are related to the query.

                Rules:
                1. Extract only verifiable factual statements
                2. Ignore opinions or speculation
                3. Focus on facts directly related to the query
                4. Return as a JSON list of strings
                5. Keep facts concise and specific

                Query: {query}
                Text: {content}

                Return format: ["fact1", "fact2", "fact3"]"""),
                            ("human", "Extract facts")
                        ])
        
        facts_by_source = {}
        
        for i, result in enumerate(results[:5]):
            content = result.get("content", "")[:800]  # Limit content
            if not content.strip():
                continue
                
            try:
                response = self.llm.invoke(
                    fact_extraction_prompt.format_messages(
                        query=query,
                        content=content
                    )
                )
                
                facts_text = response.content.strip()
                if facts_text.startswith("```json"):
                    facts_text = facts_text[7:-3].strip()
                elif facts_text.startswith("```"):
                    facts_text = facts_text[3:-3].strip()
                
                facts = json.loads(facts_text)
                facts_by_source[f"source_{i}"] = facts
                
            except Exception:
                facts_by_source[f"source_{i}"] = []
        
        return facts_by_source
    
    def _analyze_consistency(self, facts_by_source: Dict[str, List[str]]) -> Dict[str, Any]:
        """Analyze consistency across extracted facts"""
        
        all_facts = []
        for facts in facts_by_source.values():
            all_facts.extend(facts)
        
        if not all_facts:
            return {"consistent_facts": [], "inconsistent_facts": [], "unique_facts": []}
        
        consistency_prompt = ChatPromptTemplate.from_messages([
            ("system", """Analyze these facts for consistency and contradictions.

                Instructions:
                1. Group similar or related facts together
                2. Identify contradictions or inconsistencies  
                3. Note facts that appear across multiple sources
                4. Identify unique facts from single sources

                Facts: {facts}

                Return JSON:
                {{
                    "consistent_facts": ["facts supported by multiple sources"],
                    "inconsistent_facts": ["contradictory facts"],
                    "unique_facts": ["facts from single sources"],
                    "confidence_notes": ["explanations"]
                }}"""),
                            ("human", "Analyze consistency")
                        ])
        
        try:
            response = self.llm.invoke(
                consistency_prompt.format_messages(facts=json.dumps(all_facts, indent=2))
            )
            
            analysis_text = response.content.strip()
            if analysis_text.startswith("```json"):
                analysis_text = analysis_text[7:-3].strip()
            elif analysis_text.startswith("```"):
                analysis_text = analysis_text[3:-3].strip()
            
            return json.loads(analysis_text)
            
        except Exception:
            return {
                "consistent_facts": all_facts[:3],
                "inconsistent_facts": [],
                "unique_facts": [],
                "confidence_notes": ["Analysis failed"]
            }
    
    def _evaluate_source_credibility(self, results: List[Dict]) -> Dict[str, float]:
        """Simple source credibility scoring"""
        credibility_scores = {}
        
        for i, result in enumerate(results):
            score = 0.5  # Base score
            url = result.get("url", "").lower()
            
            # Domain-based scoring
            if any(domain in url for domain in [".edu", ".gov", ".org"]):
                score += 0.3
            elif any(domain in url for domain in [".com", ".net"]):
                score += 0.1
            
            # Known credible sources
            credible_domains = [
                "wikipedia.org", "pubmed.ncbi.nlm.nih.gov", "who.int", 
                "cdc.gov", "nih.gov", "reuters.com", "bbc.com", "nature.com"
            ]
            
            if any(domain in url for domain in credible_domains):
                score += 0.2
            
            # Content quality
            if len(result.get("content", "")) > 500:
                score += 0.1
            
            # Tavily relevance score
            tavily_score = result.get("score", 0)
            score += min(tavily_score * 0.1, 0.1)
            
            credibility_scores[f"source_{i}"] = min(max(score, 0.0), 1.0)
        
        return credibility_scores
    
    def _calculate_consistency_score(self, consistency_analysis: Dict, credibility_scores: Dict) -> float:
        """Calculate overall consistency score"""
        
        consistent_facts = len(consistency_analysis.get("consistent_facts", []))
        inconsistent_facts = len(consistency_analysis.get("inconsistent_facts", []))
        
        if consistent_facts + inconsistent_facts == 0:
            consistency_ratio = 0.5
        else:
            consistency_ratio = consistent_facts / (consistent_facts + inconsistent_facts)
        
        avg_credibility = sum(credibility_scores.values()) / len(credibility_scores) if credibility_scores else 0.5
        conflict_penalty = min(inconsistent_facts * 0.1, 0.3)
        
        final_score = (consistency_ratio * 0.6 + avg_credibility * 0.4) - conflict_penalty
        return min(max(final_score, 0.0), 1.0)
    
    def _determine_confidence_level(self, score: float) -> str:
        """Determine confidence level"""
        if score >= 0.8:
            return "high"
        elif score >= 0.6:
            return "medium"
        else:
            return "low"
    
    def _generate_verification_notes(self, consistency_analysis: Dict, credibility_scores: Dict) -> List[str]:
        """Generate verification notes"""
        notes = []
        
        consistent_count = len(consistency_analysis.get("consistent_facts", []))
        inconsistent_count = len(consistency_analysis.get("inconsistent_facts", []))
        
        notes.append(f"Found {consistent_count} consistent facts across sources")
        
        if inconsistent_count > 0:
            notes.append(f"Detected {inconsistent_count} potential contradictions")
        
        avg_credibility = sum(credibility_scores.values()) / len(credibility_scores) if credibility_scores else 0
        notes.append(f"Average source credibility: {avg_credibility:.2f}")
        
        return notes



In [9]:
class SearchAgent:
    """Simple search agent with information verification"""
    
    def __init__(self):
        # Initialize LLM and clients
        self.llm = ChatGoogleGenerativeAI(
            model="gemini-1.5-pro",
            temperature=0.1,
            google_api_key=os.getenv("GEMINI_API_KEY")
        )
        
        self.tavily_client = TavilyClient(api_key=os.getenv("TAVILY_API_KEY"))
        self.verifier = InformationVerifier(self.llm)
        
        # Create the graph
        self.graph = self._create_graph()
    
    def _search_node(self, state: AgentState) -> AgentState:
        """Execute Tavily search"""
        query = state["query"]
        config = state["config"]
        
        try:
            search_results = self.tavily_client.search(
                query=query,
                search_depth=config.search_depth.value,
                max_results=config.max_results,
                include_answer=True,
                include_raw_content=False
            )
            
            state["search_results"] = search_results
            print(f"✓ Search completed: {len(search_results.get('results', []))} sources found")
            
        except Exception as e:
            print(f"✗ Search failed: {e}")
            state["search_results"] = {"error": str(e), "results": [], "answer": ""}
        
        return state
    
    def _verify_node(self, state: AgentState) -> AgentState:
        """Verify information consistency"""
        
        query = state["query"]
        search_results = state["search_results"]
        config = state["config"]
        
        try:
            verification_result = self.verifier.verify_information(query, search_results, config)
            state["verification_result"] = verification_result
            
            print(f"✓ Verification completed: {verification_result.confidence_level} confidence ({verification_result.consistency_score:.2f})")
            
            if verification_result.conflicting_claims:
                print(f"⚠ Conflicts detected: {len(verification_result.conflicting_claims)}")
            
        except Exception as e:
            print(f"✗ Verification failed: {e}")
            state["verification_result"] = VerificationResult(
                consistency_score=0.5,
                confidence_level="unknown",
                verification_notes=[f"Verification failed: {e}"]
            )
        
        return state
    
    def _answer_node(self, state: AgentState) -> AgentState:
        """Generate final answer with verification context"""
        
        query = state["query"]
        search_results = state["search_results"]
        verification_result = state.get("verification_result")
        
        # Format search results
        if "error" in search_results:
            results_text = f"Search Error: {search_results['error']}"
        else:
            results_text = self._format_search_results(search_results)
        
        # Add verification context
        verification_context = ""
        if verification_result:
            verification_context = f"""
            VERIFICATION ANALYSIS:
            - Consistency Score: {verification_result.consistency_score:.2f}
            - Confidence Level: {verification_result.confidence_level}
            - Sources Analyzed: {len(verification_result.supporting_sources)}

            {f"⚠ CONFLICTS DETECTED: {verification_result.conflicting_claims}" if verification_result.conflicting_claims else "✓ No major conflicts detected"}

            Notes: {'; '.join(verification_result.verification_notes)}
            """
                    
            answer_prompt = ChatPromptTemplate.from_messages([
                        ("system", """You are an AI research assistant with fact-checking capabilities. Provide accurate answers based on search results and verification analysis.

            IMPORTANT GUIDELINES:
            1. Use search results as your primary information source
            2. Consider the verification analysis when forming your answer
            3. If consistency score is LOW (< 0.6), mention uncertainty clearly
            4. If conflicts are detected, acknowledge them in your response
            5. Cite sources when possible
            6. Be transparent about limitations

            SEARCH RESULTS:
            {search_results}

            {verification_context}

            Provide a comprehensive answer that incorporates the verification insights."""),
                        ("human", "Question: {query}")
                    ])
        
        try:
            response = self.llm.invoke(
                answer_prompt.format_messages(
                    query=query,
                    search_results=results_text,
                    verification_context=verification_context
                )
            )
            
            state["final_answer"] = response.content
            print("✓ Answer generated with verification context")
            
        except Exception as e:
            error_msg = f"Answer generation failed: {str(e)}"
            state["final_answer"] = error_msg
            print(f"✗ Answer generation failed: {e}")
        
        return state
    
    def _format_search_results(self, search_results: Dict) -> str:
        """Format search results for LLM"""
        results_text = ""
        
        # Add Tavily AI answer
        if search_results.get("answer"):
            results_text += f"AI SUMMARY: {search_results['answer']}\n\n"
        
        # Add search results
        if search_results.get("results"):
            results_text += "SOURCES:\n"
            for i, result in enumerate(search_results["results"], 1):
                results_text += f"{i}. {result.get('title', 'Untitled')}\n"
                results_text += f"   URL: {result.get('url', 'No URL')}\n"
                results_text += f"   Content: {result.get('content', 'No content')[:300]}...\n\n"
        
        return results_text
    
    def _create_graph(self) -> StateGraph:
        """Create the workflow: search → verify → answer"""
        workflow = StateGraph(AgentState)
        
        workflow.add_node("search", self._search_node)
        workflow.add_node("verify", self._verify_node)
        workflow.add_node("answer", self._answer_node)
        
        workflow.set_entry_point("search")
        workflow.add_edge("search", "verify")
        workflow.add_edge("verify", "answer")
        workflow.add_edge("answer", END)
        
        return workflow.compile()
    
    def search(self, query: str, config: Optional[SearchConfig] = None) -> SearchResponse:
        """Main search method with verification"""
        
        start_time = time.time()
        search_config = config or SearchConfig()
        
        print(f"\n🔍 Searching: {query}")
        print(f"📊 Verification: {'enabled' if search_config.enable_verification else 'disabled'}")
        
        # Prepare initial state
        initial_state = {
            "query": query,
            "search_results": {},
            "verification_result": None,
            "final_answer": "",
            "config": search_config,
            "start_time": start_time
        }
        
        # Execute workflow
        final_state = self.graph.invoke(initial_state)
        
        # Calculate duration
        duration_ms = int((time.time() - start_time) * 1000)
        
        # Create response
        response = SearchResponse(
            query=query,
            answer=final_state.get("final_answer", ""),
            tavily_answer=final_state.get("search_results", {}).get("answer"),
            source_count=len(final_state.get("search_results", {}).get("results", [])),
            search_results=final_state.get("search_results", {}),
            verification=final_state.get("verification_result"),
            timestamp=datetime.now(),
            duration_ms=duration_ms
        )
        
        print(f"⏱ Completed in {duration_ms}ms")
        return response

In [12]:
# Example usage
if __name__ == "__main__":
    # Initialize agent
    agent = SearchAgent()
    
    # Example searches with verification
    queries = [
        "What are the health benefits of intermittent fasting?",
    ]
    
    for query in queries:
        print("\n" + "="*80)
        
        # Search with verification enabled
        result = agent.search(query, SearchConfig(enable_verification=True))
        
        print(f"\n📋 RESULTS SUMMARY:")
        print(f"Query: {result.query}")
        print(f"Sources: {result.source_count}")
        
        if result.verification:
            print(f"Verification: {result.verification.confidence_level} confidence ({result.verification.consistency_score:.2f})")
            if result.verification.conflicting_claims:
                print(f"Conflicts: {len(result.verification.conflicting_claims)} detected")
        
        print(f"\n💬 ANSWER:")
        print(result.answer[:400] + "..." if len(result.answer) > 400 else result.answer)



🔍 Searching: What are the health benefits of intermittent fasting?
📊 Verification: enabled
✗ Search failed: 403 Client Error: Forbidden for url: https://api.tavily.com/search
✓ Verification completed: low confidence (0.30)


Retrying langchain_google_genai.chat_models._chat_with_retry.<locals>._chat_with_retry in 2.0 seconds as it raised ResourceExhausted: 429 You exceeded your current quota, please check your plan and billing details. For more information on this error, head to: https://ai.google.dev/gemini-api/docs/rate-limits. [violations {
}
violations {
}
violations {
}
, links {
  description: "Learn more about Gemini API quotas"
  url: "https://ai.google.dev/gemini-api/docs/rate-limits"
}
, retry_delay {
  seconds: 33
}
].


✗ Answer generation failed: 429 You exceeded your current quota, please check your plan and billing details. For more information on this error, head to: https://ai.google.dev/gemini-api/docs/rate-limits. [violations {
}
violations {
}
violations {
}
, links {
  description: "Learn more about Gemini API quotas"
  url: "https://ai.google.dev/gemini-api/docs/rate-limits"
}
, retry_delay {
  seconds: 31
}
]
⏱ Completed in 4728ms

📋 RESULTS SUMMARY:
Query: What are the health benefits of intermittent fasting?
Sources: 0
Verification: low confidence (0.30)

💬 ANSWER:
Answer generation failed: 429 You exceeded your current quota, please check your plan and billing details. For more information on this error, head to: https://ai.google.dev/gemini-api/docs/rate-limits. [violations {
}
violations {
}
violations {
}
, links {
  description: "Learn more about Gemini API quotas"
  url: "https://ai.google.dev/gemini-api/docs/rate-limits"
}
, retry_delay {
  seconds: 3...
