In [None]:
import os

# ðŸš¨ PASTE YOUR COPIED KEY HERE, inside the quotation marks ðŸš¨
os.environ["GEMINI_API_KEY"] = "#########################################"

print("API Key set successfully in environment variable.")

API Key set successfully in environment variable.


In [None]:
%%writefile verifier_agent.py

import json
import time
import requests
import os
from typing import Dict, List, Any, Tuple

# --- API Configuration ---
API_KEY = os.getenv("GEMINI_API_KEY", "")

if not API_KEY:
    print("CRITICAL ERROR: GEMINI_API_KEY environment variable is not set. API calls will fail.")

API_URL = "https://generativelanguage.googleapis.com/v1beta/models/gemini-2.5-flash-preview-09-2025:generateContent?key=" + API_KEY

def call_gemini_api_with_retry(payload, max_retries=3):
    """Handles API calls with basic exponential backoff."""
    for attempt in range(max_retries):
        try:
            if not API_KEY:
                raise requests.exceptions.HTTPError("403 Client Error: Forbidden (API Key is missing)")

            response = requests.post(API_URL, headers={'Content-Type': 'application/json'}, json=payload)
            response.raise_for_status()
            return response.json()
        except requests.exceptions.HTTPError as e:
            if "API Key is missing" in str(e):
                raise

            if response.status_code == 429 and attempt < max_retries - 1:
                wait_time = 2 ** attempt
                time.sleep(wait_time)
            else:
                raise

def verify_claim_with_grounding(search_query, claim_text, top_k: int = 5) -> Tuple[str, List[Dict[str, str]]]:
    """
    Calls the Gemini API, enabling the Google Search tool to verify a claim.
    Returns a summary and a list of the top-k source dictionaries.

    Args:
        search_query: The query used to search the web.
        claim_text: The claim to be verified.
        top_k: The maximum number of sources to return.

    Returns:
        A tuple containing the summary verdict string and the list of sources.
    """
    system_prompt = (
        "You are a Source Verifier Agent. Use Google Search grounding to find evidence "
        "that either strongly corroborates or strongly contradicts the provided claim. "
        "Summarize the findings concisely and state whether the claim is: CORROBORATED, CONTRADICTED, or UNVERIFIED. "
        "Do not invent information."
    )
    user_query = f"Verify the following claim: '{claim_text}'. Search using the query: '{search_query}'"

    payload = {
        "contents": [{ "parts": [{ "text": user_query }] }],
        "systemInstruction": { "parts": [{ "text": system_prompt }] },
        # CRITICAL: Enable Google Search Grounding for evidence retrieval
        "tools": [{ "google_search": {} }],
    }

    try:
        response = call_gemini_api_with_retry(payload)
        candidate = response.get('candidates', [{}])[0]

        # 1. Extract generated text summary
        summary = candidate.get('content', {}).get('parts', [{}])[0].get('text', 'Verification failed: No summary generated.')

        # 2. Extract grounding sources (citations) and limit to top_k
        sources = []
        grounding_metadata = candidate.get('groundingMetadata')
        if grounding_metadata and grounding_metadata.get('groundingAttributions'):
            # Extract all valid sources
            all_sources = [
                {'uri': attr.get('web', {}).get('uri'), 'title': attr.get('web', {}).get('title')}
                for attr in grounding_metadata['groundingAttributions']
                if attr.get('web', {}).get('uri')
            ]
            # Limit the sources to top_k
            sources = all_sources[:top_k]

        return summary, sources

    except requests.exceptions.HTTPError as e:
        return f"Verification failed due to API error: {e}", []
    except Exception as e:
        return f"Verification failed due to generic error: {e}", []

def normalize_verdict_text(summary):
    """
    Strips common formatting (newlines, asterisks, colons, punctuation) to ensure
    robust keyword matching for CONTRADICTED/UNVERIFIED/CORROBORATED.
    """
    # Convert to upper case
    text = summary.upper()
    # Remove common markdown bolding, punctuation, and newlines that interfere with simple matching
    text = text.replace('*', '').replace(':', '').replace('\n', ' ').replace('.', ' ')
    return text

def calculate_verification_score(claims):
    """
    Implements the Source Reliability Heuristic (TM 3 Step 2).
    A high score (closer to 1.0) indicates that the claims are highly CONTRADICTED (i.e., high risk of fake).
    """
    risk_count = 0

    print("\n--- DEBUGGING SCORING LOGIC ---")
    for i, claim in enumerate(claims):
        summary = claim.get('corroboration_summary', '')
        # Use the robust normalization function
        normalized_verdict = normalize_verdict_text(summary)

        # Print the claim type and the normalized string being processed
        print(f"Claim {i+1} ({claim.get('category')}): Normalized Verdict String: '{normalized_verdict[:100]}...'")

        # A claim contributes to the risk score if it is explicitly CONTRADICTED or UNVERIFIED.
        # Check for CONTRADICTED first, as it's the highest risk factor.
        if "CONTRADICTED" in normalized_verdict:
            risk_count += 1
            print(f"  -> MATCH: CONTRADICTED. Risk Count: {risk_count}")
        elif "UNVERIFIED" in normalized_verdict:
            risk_count += 1
            print(f"  -> MATCH: UNVERIFIED. Risk Count: {risk_count}")
        else:
            print("  -> NO MATCH (CORROBORATED/N/A).")

    total_claims = len(claims)
    if total_claims == 0:
        return 0.5

    # Score = proportion of risky claims (Contradicted or Unverified)
    verification_score = risk_count / total_claims if total_claims > 0 else 0.5

    print(f"Total Claims: {total_claims}. Final Risk Count: {risk_count}. Base Score: {verification_score:.4f}")

    # Simple Contradiction Logic (TM 3 Step 3): Increase score if critical policy is contradicted
    # Use the same normalization logic here
    has_contradicted_policy = any("CONTRADICTED" in normalize_verdict_text(c.get('corroboration_summary', '')) and c.get('category') == 'POLICY' for c in claims)
    if has_contradicted_policy:
        print("ALERT: CRITICAL POLICY CLAIM WAS CONTRADICTED. ADJUSTING SCORE.")
        verification_score = min(1.0, verification_score + 0.2)

    return verification_score

def tm3_verifier_agent(state):
    """
    Implements the Evidence Retrieval and Source Reliability Agent logic.
    """
    print("--- TM 3 (Verifier Agent): Running Evidence Retrieval with Google Search Grounding ---")

    if not state['claims']:
        print("No claims extracted by TM 2. Setting Verification Score to neutral (0.5).")
        state['verification_score'] = 0.5
        return state

    for i, claim in enumerate(state['claims']):
        print(f"-> Verifying Claim {i+1} ({claim['category']}): {claim['claim_text']}")

        # Note: We keep the top_k default at 5 here. We could pass it as a parameter if needed.
        summary, sources = verify_claim_with_grounding(claim['search_query'], claim['claim_text'], top_k=5)

        # Update the claim object in the state
        claim['corroboration_summary'] = summary
        claim['sources'] = sources

        print(f"   Summary: {summary.splitlines()[0]}...")
        print(f"   Sources Found: {len(sources)}")

    # Recalculate the overall Verification Score based on the updated claims
    state['verification_score'] = calculate_verification_score(state['claims'])

    print(f"Final Verification Score: {state['verification_score']:.4f}")
    return state

Overwriting verifier_agent.py


the following is for checking the evidence retrieval agent indepently

In [None]:
import sys
import os
import json

# Ensure the core agent file can be imported
try:
    from verifier_agent import tm3_verifier_agent
except ImportError:
    print("Error: Could not find 'verifier_agent.py'. Make sure it is in the same directory.")
    sys.exit(1)

def mock_state_with_claims():
    """
    Creates a mock 'AssessmentState' object that simulates the output
    from a fully functional TM 2 (Claim Extraction Agent).
    """
    print("--- Initializing Mock State (Simulating TM 2 Output) ---")

    # These claims simulate the JSON structure expected from TM 2
    mock_claims = [
        {
            "claim_text": "We are acquiring BananaCo for $500 million starting next Tuesday.",
            "category": "FINANCIAL",
            "search_query": "BananaCo acquisition $500 million official announcement",
            "priority": 1
        },
        {
            "claim_text": "Hello, this is CEO John Smith.",
            "category": "IDENTITY",
            "search_query": "CEO John Smith official bio",
            "priority": 2
        },
        {
            "claim_text": "Our policy is to never wire funds over the phone.",
            "category": "POLICY",
            "search_query": "Company policy on wire transfer requests via voice",
            "priority": 1
        }
    ]

    mock_state = {
        "audio_url": "mock://test_audio.wav",
        "claimed_speaker_id": "John Smith",
        "transcript": "Mock transcript content...",
        "claims": mock_claims,
        "verification_score": None
    }

    return mock_state

def run_tm3_isolation_test():
    """
    Runs only the TM 3 agent on mock data and prints the result.
    """
    state = mock_state_with_claims()

    # Call your TM 3 function
    print("\n--- Executing TM 3 Verifier Agent (Evidence Retrieval) ---")
    final_state = tm3_verifier_agent(state)

    # Print the final, verified state
    print("\n--- FINAL TM 3 OUTPUT (Verified Claims & Score) ---")
    print(f"Overall Verification Score: {final_state['verification_score']:.4f}")

    # Print detailed claim verification results
    for i, claim in enumerate(final_state['claims']):
        print(f"\nClaim {i+1}: {claim['claim_text']}")
        print(f"  Summary: {claim.get('corroboration_summary', 'N/A')}")
        print(f"  Sources: {len(claim.get('sources', []))} found.")
        if claim.get('sources'):
            print(f"  Example Source: {claim['sources'][0]['title']} ({claim['sources'][0]['uri']})")

    # This is the final JSON structure for debugging:
    print("\nFULL STATE OUTPUT:")
    print(json.dumps(final_state, indent=2))


if __name__ == "__main__":
    run_tm3_isolation_test()

--- Initializing Mock State (Simulating TM 2 Output) ---

--- Executing TM 3 Verifier Agent (Evidence Retrieval) ---
--- TM 3 (Verifier Agent): Executing Evidence Retrieval ---
-> Verifying Claim 1 (FINANCIAL): We are acquiring BananaCo for $500 million starting next Tuesday.
   Summary: The search for 'BananaCo acquisition $500 million official announcement' did not yield any results confirming an official announcement for the acquisition of BananaCo for \$500 million starting next Tuesday....
-> Verifying Claim 2 (IDENTITY): Hello, this is CEO John Smith.
   Summary: Multiple individuals named John Smith are or have been chief executive officers....
-> Verifying Claim 3 (POLICY): Our policy is to never wire funds over the phone.
   Summary: The claim "Our policy is to never wire funds over the phone" is **UNVERIFIED** as a general or industry-wide policy....
Final Verification Score: 1.0000

--- FINAL TM 3 OUTPUT (Verified Claims & Score) ---
Overall Verification Score: 1.0000

Claim

In [None]:
%%writefile source_reliability_agent.py

import json
import time
import requests
import os
from typing import Dict, List, Any, Tuple

# --- API Configuration ---
API_KEY = os.getenv("GEMINI_API_KEY", "")

if not API_KEY:
    print("CRITICAL ERROR: GEMINI_API_KEY environment variable is not set. API calls may fail.")

API_URL = "https://generativelanguage.googleapis.com/v1beta/models/gemini-2.5-flash-preview-09-2025:generateContent?key=" + API_KEY

def call_gemini_api_with_retry(payload: Dict[str, Any], max_retries: int = 3) -> Dict[str, Any]:
    """Handles API calls with basic exponential backoff."""
    for attempt in range(max_retries):
        try:
            if not API_KEY:
                raise requests.exceptions.HTTPError("403 Client Error: Forbidden (API Key is missing)")

            response = requests.post(API_URL, headers={'Content-Type': 'application/json'}, json=payload)
            response.raise_for_status()
            return response.json()
        except requests.exceptions.HTTPError as e:
            if "API Key is missing" in str(e):
                raise

            if (response.status_code == 429 or response.status_code >= 500) and attempt < max_retries - 1:
                wait_time = 2 ** attempt
                time.sleep(wait_time)
            else:
                raise
        except Exception as e:
            if attempt < max_retries - 1:
                wait_time = 2 ** attempt
                time.sleep(wait_time)
            else:
                raise

def normalize_verdict_text(summary: str) -> str:
    """Strips common formatting and punctuation for robust keyword matching."""
    text = summary.upper()
    text = text.replace('*', '').replace(':', '').replace('\n', ' ').replace('.', ' ')
    return text

def tm3_source_reliability_agent(state: Dict[str, Any]) -> Dict[str, Any]:
    """
    LangGraph Node for Source Reliability Analysis (TM 3 - Step 2).
    Uses LLM reasoning to adjust the base score based on evidence quality.
    """
    print("--- TM 3 (Source Reliability Agent): Executing Score Adjustment ---")

    if not state.get('reliability_analysis_required'):
        print("Skipping reliability analysis (no claims to process).")
        return state

    initial_score = state.get('verification_score', 0.5)

    # 1. Identify claims that have a clear verdict (CORROBORATED or CONTRADICTED) AND sources
    citable_claims = []
    for claim in state['claims']:
        verdict = normalize_verdict_text(claim.get('corroboration_summary', '')).strip()

        # Only analyze claims where the verdict is NOT UNVERIFIED, and sources exist
        if ("CORROBORATED" in verdict or "CONTRADICTED" in verdict) and claim.get('sources'):
             citable_claims.append({
                "claim": claim['claim_text'],
                "verdict": claim['corroboration_summary'],
                "sources": claim['sources']
            })

    if not citable_claims:
        print("Source Reliability: No high-confidence, citable claims found for score adjustment.")
        return state

    # 2. Format the prompt for the LLM Analyst
    prompt = (
        "You are a Source Reliability Analyst. Your task is to analyze the provided claim verification summaries "
        "and their supporting sources (URIs/Titles). For each claim, determine the overall reliability of the evidence. "
        "If the sources are official (e.g., government, major press, corporate press release), reliability is HIGH. "
        "If sources are weak (e.g., forums, personal blogs, low-tier news), reliability is LOW. "
        "Analyze the quality of the evidence for the following claims: "
        f"{json.dumps(citable_claims, indent=2)}"
        "\n\nBased on the quality of ALL analyzed evidence, provide ONE final verdict (HIGH, MEDIUM, or LOW) for the overall reliability of the combined evidence."
    )

    payload = {
        "contents": [{ "parts": [{ "text": prompt }] }],
        "systemInstruction": { "parts": [{ "text": "Analyze source reliability and provide a single word verdict: HIGH, MEDIUM, or LOW." }] },
    }

    # 3. Call the LLM Analyst
    try:
        response = call_gemini_api_with_retry(payload)
        llm_verdict = response.get('candidates', [{}])[0].get('content', {}).get('parts', [{}])[0].get('text', '').upper().strip()

        adjustment = 0.0

        if "LOW" in llm_verdict:
            # Low quality sources makes the verification verdict less trustworthy -> increase risk
            adjustment = 0.05
            print(f"Source Reliability Verdict: LOW. Applying +{adjustment:.2f} risk adjustment.")
        elif "HIGH" in llm_verdict:
            # High quality sources makes the verification verdict highly trustworthy -> decrease risk
            adjustment = -0.05
            print(f"Source Reliability Verdict: HIGH. Applying {adjustment:.2f} risk adjustment.")
        else:
            print("Source Reliability Verdict: MEDIUM (No adjustment).")

        final_score = max(0.0, min(1.0, initial_score + adjustment))
        state['verification_score'] = final_score

    except Exception as e:
        print(f"Source Reliability analysis failed: {e}. Keeping initial score: {initial_score:.4f}.")
        # If the LLM call fails, we revert to the base score
        state['verification_score'] = initial_score

    print(f"Final Verification Score (Adjusted): {state['verification_score']:.4f}")
    return state

Writing source_reliability_agent.py
