In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [1]:
!pip install -q \
    gradio \
    google-generativeai \
    SpeechRecognition \
    pydub \
    gTTS \
    langchain \
    langchain-google-genai \
    langchain_community \
    langchain-huggingface \
    faiss-cpu \
    ragas\
    langchain_tavily\
    beautifulsoup4 \
    selenium


[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m32.9/32.9 MB[0m [31m35.7 MB/s[0m eta [36m0:00:00[0m:00:01[0m00:01[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m42.0/42.0 kB[0m [31m1.5 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.5/2.5 MB[0m [31m56.7 MB/s[0m eta [36m0:00:00[0m:00:01[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.0/1.0 MB[0m [31m30.7 MB/s[0m eta [36m0:00:00[0m:00:01[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m31.4/31.4 MB[0m [31m31.8 MB/s[0m eta [36m0:00:00[0m00:01[0m:00:01[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m279.1/279.1 kB[0m [31m10.2 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m9.6/9.6 MB[0m [31m76.1 MB/s[0m eta [36m0:00:00[0m:00:01[0m00:01[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m98.2/98.2 kB[0m [31m3.6 MB/s[0

In [4]:
import gradio as gr
import speech_recognition as sr
import os
import uuid
import logging
import requests
from bs4 import BeautifulSoup
import json
import collections
import concurrent.futures
import tempfile
import shutil
from gtts import gTTS
from langchain_community.vectorstores import FAISS
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain.prompts import PromptTemplate, ChatPromptTemplate
from langchain_core.runnables import RunnableSequence
from langchain_core.output_parsers import StrOutputParser
from langchain_core.language_models import BaseChatModel
from langchain_tavily import TavilySearch
from kaggle_secrets import UserSecretsClient
import nltk
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC

# Download NLTK data
nltk.download('punkt')

# --- CONFIGURATION ---
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")

# Retrieve API keys from Kaggle Secrets
try:
    user_secrets = UserSecretsClient()
    API_KEY = user_secrets.get_secret("GEMINI_API_KEY")
    TAVILY_API_KEY = user_secrets.get_secret("TAVILY_API_KEY")
    logging.info("Successfully retrieved API keys from Kaggle Secrets.")
except Exception as e:
    API_KEY = None
    TAVILY_API_KEY = None
    logging.warning(f"Could not retrieve API keys from Kaggle Secrets: {e}. The application may not function.")
if TAVILY_API_KEY:
    os.environ["TAVILY_API_KEY"] = TAVILY_API_KEY

# --- INITIALIZE SERVICES ---
recognizer = sr.Recognizer()
try:
    llm = ChatGoogleGenerativeAI(model="gemini-2.5-flash", google_api_key=API_KEY, temperature=0.8)  # Increased temperature
except Exception as e:
    logging.error(f"Failed to initialize Gemini LLM: {e}")
    llm = None
vector_db = None
embedding_function = None
try:
    embedding_function = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2")
    logging.info("✅ HuggingFace embeddings initialized.")
except Exception as e:
    logging.error(f"Failed to initialize embeddings: {e}")

# ================================
# SECTION 1: EVALUATION & LOGGING
# ================================
eval_logger = logging.getLogger("evaluation_logger")
eval_logger.setLevel(logging.INFO)
file_handler = logging.FileHandler("fact_checks.log")
formatter = logging.Formatter('%(asctime)s - %(message)s')
file_handler.setFormatter(formatter)
eval_logger.addHandler(file_handler)

def log_fact_check(claim: str, evidence: str, llm_response: str):
    log_entry = {"claim": claim, "evidence_context": evidence, "llm_response": llm_response}
    eval_logger.info(json.dumps(log_entry))
    logging.info("Fact-check entry logged for evaluation.")

def compute_keyword_overlap(explanation: str, evidence: str) -> float:
    """Compute a fallback score based on keyword overlap."""
    explanation_tokens = set(word_tokenize(explanation.lower()))
    evidence_tokens = set(word_tokenize(evidence.lower()))
    common_tokens = explanation_tokens.intersection(evidence_tokens)
    if not explanation_tokens or not evidence_tokens:
        return 0.0
    overlap = len(common_tokens) / len(explanation_tokens)
    return round(overlap, 4)
# Initialize Selenium WebDriver
def init_selenium_driver():
    try:
        chrome_options = Options()
        chrome_options.add_argument("--headless")
        chrome_options.add_argument("--disable-gpu")
        chrome_options.add_argument("--no-sandbox")
        chrome_options.add_argument("--disable-dev-shm-usage")
        chrome_options.add_argument("user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36")
        driver = webdriver.Chrome(options=chrome_options)
        return driver
    except Exception as e:
        logging.warning(f"Failed to initialize Selenium driver: {e}")
        return None


def simple_llm_check(claim: str, explanation: str, evidence: str) -> float:
    """Simple LLM-based check for fallback scoring."""
    if not llm:
        return 0.0
    prompt = PromptTemplate(
        template="""
        You are a fact-checker evaluating whether an explanation accurately reflects the provided evidence for a given claim.
        - Claim: {claim}
        - Explanation: {explanation}
        - Evidence: {evidence}
        Instructions:
        1. Compare the explanation to the evidence.
        2. Assess if the explanation's key points (e.g., facts, conclusions) are supported by the evidence.
        3. Return a score from 0 to 1, where 1 is completely accurate and 0 is completely inaccurate.
        Output: Score: [0-1]
        """,
        input_variables=["claim", "explanation", "evidence"]
    )
    chain = prompt | llm | StrOutputParser()
    try:
        result = chain.invoke({"claim": claim, "explanation": explanation, "evidence": evidence})
        score = float(result.split("Score: ")[-1].strip())
        return round(score, 4)
    except Exception as e:
        logging.warning(f"Simple LLM check failed: {e}")
        return 0.0
def evaluate_fact_check(claim, explanation, evidence, responses, sources, llm=None):
    def length_score(text, min_words=80, max_words=400):
        words = text.split()
        if len(words) < min_words:
            return len(words) / min_words
        if len(words) > max_words:
            return max_words / len(words)
        return 1.0

    def neutrality_score(text):
        biased_words = ["definitely", "obviously", "clearly", "undoubtedly"]
        penalty = sum(text.lower().count(w) for w in biased_words)
        return max(0.0, 1 - (penalty * 0.1))

    def source_coverage(evidence, sources):
        return sum(1 for s in sources if s in evidence) / max(1, len(sources))

    def verdict_agreement(responses):
        verdicts = [r["verdict"] for r in responses if "verdict" in r]
        if not verdicts:
            return 0.0
        majority = max(set(verdicts), key=verdicts.count)
        return verdicts.count(majority) / len(verdicts)

    def keyword_relevance(claim, explanation):
        from nltk.tokenize import word_tokenize
        claim_words = set(word_tokenize(claim.lower()))
        expl_words = set(word_tokenize(explanation.lower()))
        overlap = claim_words & expl_words
        return len(overlap) / max(1, len(claim_words))

    def llm_grounding_score(claim, explanation, evidence):
        if not llm:
            return 0.5
        try:
            return simple_llm_check(claim, explanation, evidence)
        except:
            return 0.5

    return {
        "content_quality": round((length_score(explanation) + neutrality_score(explanation)) / 2, 2),
        "evidence_grounding": round((source_coverage(evidence, sources) + llm_grounding_score(claim, explanation, evidence)) / 2, 2),
        "consistency": round(verdict_agreement(responses), 2),
        "relevance": round(keyword_relevance(claim, explanation), 2),
    }


# ================================
# SECTION 2: CORE PIPELINE
# ================================
def tavily_search(query: str, num_results: int = 15):
    if not TAVILY_API_KEY:
        logging.error("Tavily API key not available.")
        return []
    try:
        search_tool = TavilySearch(max_results=num_results)
        response_dict = search_tool.invoke(query)
        if isinstance(response_dict, dict) and "results" in response_dict:
            return [res.get("url") for res in response_dict["results"] if res.get("url")]
        if isinstance(response_dict, list) and all(isinstance(res, dict) for res in response_dict):
            return [res.get("url") for res in response_dict if res.get("url")]
        logging.error(f"Tavily search returned an unexpected format: {response_dict}")
        return []
    except Exception as e:
        logging.error(f"Tavily search failed: {e}")
        return []

def get_content_from_urls(urls: list[str]) -> str:
    all_content = []
    driver = init_selenium_driver()
    for url in urls:
        try:
            if driver:
                driver.get(url)
                WebDriverWait(driver, 15).until(
                    EC.presence_of_element_located((By.TAG_NAME, "body"))
                )
                # Scroll multiple times to load full content
                for _ in range(3):
                    driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
                    WebDriverWait(driver, 5).until(
                        EC.presence_of_element_located((By.TAG_NAME, "body"))
                    )
                soup = BeautifulSoup(driver.page_source, "html.parser")
            else:
                headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"}
                response = requests.get(url, headers=headers, timeout=15)
                response.raise_for_status()
                soup = BeautifulSoup(response.text, "html.parser")
            
            for element in soup(["script", "style", "nav", "header", "footer", "aside", "form", "iframe"]):
                element.decompose()
            
            content_selectors = [
                ("article", None),
                ("main", None),
                ("div", {"class": ["content", "article-body", "post-content", "entry-content", "article-content"]}),
                ("section", {"class": ["content", "article"]}),
                ("div", {"id": ["content", "main-content", "article"]}),
                ("div", {"role": "main"}),
            ]
            text = None
            for tag, attrs in content_selectors:
                main_content = soup.find(tag, attrs=attrs)
                if main_content:
                    text = main_content.get_text(separator=" ", strip=True)
                    break
            if not text:
                text = soup.get_text(separator=" ", strip=True)
            
            if len(text.strip()) < 300:
                logging.warning(f"Content from {url} is too short, likely navigational.")
                continue
            all_content.append(f"Source: {url}\nContent: {text[:8000]}...")
        except Exception as e:
            logging.warning(f"Could not fetch {url}: {e}")
    if driver:
        driver.quit()
    return "\n\n---\n\n".join(all_content)

def vet_sources(urls: list[str]) -> list[str]:
    reputable_domains = [
        "reuters.com", "apnews.com", "bbc.com", "nytimes.com", "wsj.com",
        "washingtonpost.com", "theguardian.com", "snopes.com", "politifact.com",
        "healthline.com", "medlineplus.gov", "harvard.edu"
    ]
    vetted = [u for u in urls if any(d in u for d in reputable_domains)]
    return vetted if vetted else urls

def synthesize_evidence(claim: str, evidence: str, consistency_level: int = 3) -> list:
    if not llm:
        return [{"verdict": "Error", "explanation": "LLM not available."}]
    prompt_template = """
    **Role:** You are a professional, unbiased fact-checker.
    **Context:** You have been given a user's claim and the text content extracted from several online sources.
    **Instructions (Chain-of-Thought):**
    1. **Analyze Evidence:** Review the content to identify consensus and contradictions.
    2. **Formulate Verdict:** Choose the most appropriate verdict: [True, Mostly True, Half True, Misleading, False, Unverifiable].
    3. **Generate Explanation:** Write a detailed, neutral explanation for your verdict.
    4. **Format Output:** Present your findings in the strict format:
        VERDICT: [Your chosen verdict]
        EXPLANATION: [Your detailed explanation]
    **User's Claim:** "{claim}"
    **Evidence from Sources:**
    {evidence}
    """
    prompt = PromptTemplate(template=prompt_template, input_variables=["claim", "evidence"])
    chain = prompt | llm | StrOutputParser()
    def run_once(_):
        try:
            resp = chain.invoke({"claim": claim, "evidence": evidence})
            verdict = resp.split("VERDICT:")[1].split("EXPLANATION:")[0].strip()
            explanation = resp.split("EXPLANATION:")[1].strip()
            return {"verdict": verdict, "explanation": explanation}
        except Exception as e:
            logging.error(f"Error during synthesis attempt: {e}")
            return {"verdict": "Error", "explanation": str(e)}
    with concurrent.futures.ThreadPoolExecutor() as executor:
        responses = list(executor.map(run_once, range(consistency_level)))
    return responses

def get_consistent_response(responses: list) -> tuple[str, str]:
    if not responses:
        return "Error", "No responses generated."
    counts = collections.Counter(r["verdict"] for r in responses)
    majority = counts.most_common(1)[0][0]
    for r in responses:
        if r["verdict"] == majority:
            logging.info(f"Majority verdict '{majority}' chosen: {counts}")
            return majority, r["explanation"]
    return responses[0]["verdict"], responses[0]["explanation"]

# ================================
# SECTION 3: INPUT/OUTPUT HANDLERS
# ================================
def speech_to_text(audio_file):
    if not audio_file: return ""
    try:
        tmp_wav = tempfile.mktemp(suffix=".wav")
        audio = AudioSegment.from_file(audio_file)
        audio.export(tmp_wav, format="wav")
        with sr.AudioFile(tmp_wav) as source:
            audio_data = recognizer.record(source)
        os.remove(tmp_wav)
        return recognizer.recognize_google(audio_data)
    except Exception as e:
        logging.error(f"Speech-to-text error: {e}")
        return f"Error: {e}"

def text_to_speech(text):
    """Generates speech using Google TTS and returns a file path."""
    try:
        tts = gTTS(text=text, lang='en')
        file_path = f"response_{uuid.uuid4().hex}.mp3"
        tts.save(file_path)
        return file_path
    except Exception as e:
        logging.error(f"TTS error: {e}")
        return None

# ================================
# SECTION 4: MAIN PIPELINE
# ================================
def fact_checking_pipeline(audio_input, text_input):
    global vector_db
    claim = speech_to_text(audio_input) if audio_input else text_input.strip()
    if not claim:
        return "Please provide a claim.", "Unverifiable", None, "No sources analyzed."
    urls = tavily_search(claim)
    if not urls:
        return "No online sources found.", "Unverifiable", None, "No sources."
    vetted = vet_sources(urls)
    evidence = get_content_from_urls(vetted)
    if not evidence:
        return "Could not fetch content.", "Unverifiable", None, "No content."
    responses = synthesize_evidence(claim, evidence, consistency_level=3)
    verdict, explanation = get_consistent_response(responses)
    log_fact_check(claim, evidence, explanation)
   
    audio_resp = text_to_speech(explanation)
    if embedding_function:
        metadata = [{"sources": ", ".join(vetted)}]
        if vector_db is None:
            vector_db = FAISS.from_texts([claim], embedding_function, metadatas=metadata)
        else:
            vector_db.add_texts([claim], metadatas=metadata)
    src_md = "\n".join(f"- [{u.split('//')[1].split('/')[0]}]({u})" for u in vetted)
   # Run evaluation
    eval_scores = evaluate_fact_check(claim, explanation, evidence, responses, vetted, llm)
    
    # Print to logs
    logging.info(f"Evaluation Scores: {json.dumps(eval_scores, indent=2)}")
    
    # Return everything including eval
    return explanation, verdict, audio_resp, src_md, eval_scores


# ================================
# SECTION 5: GRADIO UI
# ================================
with gr.Blocks(theme=gr.themes.Soft(), title="Veracity Engine") as demo:
    with gr.Tab("🚀 Showcase"):
        gr.Markdown("# 🔎 Veracity Engine: Evidence-Based Fact-Checking")
        with gr.Row():
            with gr.Column():
                gr.Markdown("## Key Features")
                gr.Markdown(
                    """
                    - **🗣️ Voice & Text Input:** Accepts claims via speech or text.
                    - **🌐 Real-Time Web Search:** Uses the Tavily API for reliable, up-to-date info.
                    - **🛡️ Source Credibility Analysis:** Prioritizes reputable sources.
                    - **⚖️ Nuanced Verdicts:** Provides a spectrum of ratings (e.g., True, Misleading).
                    - **✍️ Evidence Synthesis:** Generates detailed explanations for its verdicts.
                    - **💾 Evidence Caching:** Uses a FAISS vector database for in-memory caching.
                    """
                )
            with gr.Column():
                gr.Markdown("## Skills & Concepts Demonstrated")
                gr.Markdown(
                    """
                    - **Grounded RAG Pipelines:** Combining real-time data with LLM generation.
                    - **Prompt Engineering:** Using Chain-of-Thought (CoT) and RCI patterns.
                    - **Self-Consistency:** Running inference multiple times to find the majority verdict.
                    - **API Integration:** Interfacing with Google Gemini and Tavily Search APIs.
                    - **Vector Database Caching:** Using FAISS for efficient in-memory storage.
                    - **LLM-based Evaluation:** Using synthetic QA pairs and LLM-as-a-judge for correctness.
                    """
                )
    with gr.Tab("Veracity Engine"):
        gr.Markdown("## Submit a Claim for Verification")
        with gr.Row():
            with gr.Column(scale=1):
                audio_input = gr.Audio(type="filepath", label="Record or Upload Your Claim")
                text_input = gr.Textbox(label="Or Type Your Claim Here", placeholder="e.g., 'Does drinking coffee help you live longer?'")
                submit_button = gr.Button("Check Fact", variant="primary")
            with gr.Column(scale=2):
                verdict_output = gr.Label(label="Verdict")
                explanation_output = gr.Textbox(label="Explanation", lines=8, interactive=False)
                audio_output = gr.Audio(label="Voice Explanation")
                sources_output = gr.Markdown(label="Evidence Sources")
                eval_output = gr.JSON(label="Evaluation Scores")

               
        submit_button.click(
        fn=fact_checking_pipeline,
        inputs=[audio_input, text_input],
        outputs=[explanation_output, verdict_output, audio_output, sources_output, eval_output],
        api_name="verify_claim"
    )


# --- LAUNCH ---
if __name__ == "__main__":
    demo.launch()

[nltk_data] Downloading package punkt to /usr/share/nltk_data...
[nltk_data]   Package punkt is already up-to-date!


* Running on local URL:  http://127.0.0.1:7862
It looks like you are running Gradio on a hosted a Jupyter notebook. For the Gradio app to work, sharing must be enabled. Automatically setting `share=True` (you can turn this off by setting `share=False` in `launch()` explicitly).

* Running on public URL: https://032e4e5cef97b360b2.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)
