In [11]:
import pandas as pd
import numpy as np
import nltk
import re
import time
import json
import string
import torch
import requests
import random
from nltk.tokenize import word_tokenize
from nltk.corpus import stopwords, wordnet
from nltk import pos_tag
from nltk.stem import WordNetLemmatizer
from sentence_transformers import SentenceTransformer, util
from tqdm import tqdm, trange
from pathlib import Path
from newspaper import Article
import ollama
from ddgs import DDGS

In [12]:
# NLTK setup
nltk.download('punkt', quiet=True)
nltk.download('stopwords', quiet=True)
nltk.download('wordnet', quiet=True)
nltk.download('averaged_perceptron_tagger', quiet=True)

# === Setup ===
device = 'cuda' if torch.cuda.is_available() else 'cpu'
stop_words = set(stopwords.words('english'))
lemmatizer = WordNetLemmatizer()
model = SentenceTransformer('all-MiniLM-L6-v2', device=device)

# === Helpers ===
def clean_text(text):
    return text.lower().translate(str.maketrans('', '', string.punctuation))

def get_wordnet_pos(treebank_tag):
    if treebank_tag.startswith('J'): return wordnet.ADJ
    elif treebank_tag.startswith('V'): return wordnet.VERB
    elif treebank_tag.startswith('N'): return wordnet.NOUN
    elif treebank_tag.startswith('R'): return wordnet.ADV
    else: return wordnet.NOUN

def extract_keywords(text):
    tokens = word_tokenize(clean_text(text))
    tagged = pos_tag(tokens)
    keywords = set()
    for word, tag in tagged:
        if word not in stop_words and word.isalpha():
            pos = get_wordnet_pos(tag)
            lemma = lemmatizer.lemmatize(word, pos)
            keywords.add(lemma)
    return keywords

def keyword_score(kw1, kw2):
    return round(len(kw1 & kw2) / len(kw1) * 100, 2) if kw1 else 0.0

def semantic_score(claim, evidence):
    emb1 = model.encode(claim, convert_to_tensor=True)
    emb2 = model.encode(evidence, convert_to_tensor=True)
    return round(float(util.cos_sim(emb1, emb2)[0][0]) * 100, 2)

def compute_score(claim, evidence):
    kw_claim = extract_keywords(claim)
    kw_evidence = extract_keywords(evidence)
    basic = keyword_score(kw_claim, kw_evidence)
    sem = semantic_score(claim, evidence)
    return round((basic + sem) / 2, 2)

def call_llm(prompt):
    try:
        response = ollama.chat(model='gemma3', messages=[{"role": "user", "content": prompt}])
        return response['message']['content'].strip()
    except Exception:
        return "ERROR"

def classify_stance(claim, evidence):
    ask = f"Claim: {claim}\nEvidence: {evidence}\nAnswer in one word: Does the evidence support, refute, or is uncertain about the claim?"
    result = call_llm(ask).lower()
    if "refute" in result: return "Refuted"
    elif "support" in result: return "Supported"
    return "Uncertain"

def extract_url(text):
    urls = re.findall(r'(https?://\S+)', text)
    return urls[0] if urls else None

def is_url_valid(url):
    try:
        resp = requests.head(url, timeout=5, allow_redirects=True)
        return resp.status_code == 200
    except Exception:
        return False

def search_duckduckgo(query, max_results=1):
    with DDGS() as ddgs:
        results = ddgs.text(query)
        for i, r in enumerate(results):
            if i >= max_results:
                break
            return r.get("href", None)
    return None

def has_converged(score_trace, epsilon=1.0, patience=2):
    if len(score_trace) < patience + 1:
        return False
    diffs = [abs(score_trace[-i]['score'] - score_trace[-i-1]['score']) for i in range(1, patience+1)]
    return all(diff < epsilon for diff in diffs)

# === Fact-checking pipeline ===
def process_claim(claim, max_attempts=6):
    prompt = f"""Fact-check the following claim. Respond in 2–3 sentences. 
You must cite a reliable news URL at the end of your response (include only one link).
Claim: \"{claim}\""""

    best_score = 0
    score_trace = []
    final_explanation = ""
    final_prompt = prompt
    final_url = None

    for attempt in range(1, max_attempts + 1):
        response = call_llm(prompt)
        explanation = response.strip()
        url = extract_url(explanation)

        if url and not is_url_valid(url):
            alt_url = search_duckduckgo(claim)
            if alt_url:
                explanation = re.sub(r'https?://\S+', alt_url, explanation)
                url = alt_url
            else:
                url = "NOT_FOUND"

        score = compute_score(claim, explanation)
        score_trace.append({'attempt': attempt, 'score': score, 'explanation': explanation, 'url': url})

        if score > best_score:
            best_score = score
            final_explanation = explanation
            final_prompt = prompt
            final_url = url

        if attempt >= 3 and has_converged(score_trace):
            break

        prompt = f"Improve your fact-checking explanation for this claim and include a real news URL:\n\"{claim}\""
        time.sleep(0.3)

    final_label = classify_stance(claim, final_explanation)
    return {
        "claim": claim,
        "evidence": final_explanation,
        "url": final_url,
        "score": best_score,
        "label": final_label,
        "final_prompt": final_prompt,
        "attempts": len(score_trace),
        "score_trace": json.dumps(score_trace)
    }

# === Scrutinizer functions ===
def fetch_article(url: str) -> str:
    try:
        art = Article(url, language="en")
        art.download(); art.parse()
        return art.text.strip()
    except Exception:
        return ""

def max_sent_cosine(model, source: str, article: str) -> float:
    sents = nltk.sent_tokenize(article)
    if not sents:
        return 0.0
    art_embs = model.encode(sents, convert_to_tensor=True)
    src_emb = model.encode(source, convert_to_tensor=True)
    return float(util.cos_sim(src_emb, art_embs)[0].max().item())

def empirical_p(null_scores, actual_score):
    return (np.sum(null_scores >= actual_score) + 1) / (len(null_scores) + 1)

# === Final Label Resolver ===
def decide_label(row):
    label = row['label']
    match = row['match']

    if label == "Supported" and match:
        return "Strongly Supported"
    elif label == "Supported" and not match:
        return "Weakly Supported"
    elif label == "Refuted" and match:
        return "Strongly Refuted"
    elif label == "Refuted" and not match:
        return "Weakly Refuted"
    elif label == "Uncertain" and match:
        return "Possibly True"
    else:
        return "Unknown"


[nltk_data] Error loading punkt: <urlopen error [Errno 104] Connection
[nltk_data]     reset by peer>
[nltk_data] Error loading stopwords: <urlopen error [Errno 104]
[nltk_data]     Connection reset by peer>
[nltk_data] Error loading wordnet: <urlopen error [Errno 104]
[nltk_data]     Connection reset by peer>
[nltk_data] Error loading averaged_perceptron_tagger: <urlopen error
[nltk_data]     [Errno 104] Connection reset by peer>


In [13]:
# === Main ===
def main():
    TEXT_FIELD = "evidence"
    FINAL_OUTPUT_CSV = "scrutinizer_results.csv"
    FINAL_LABEL_OUTPUT = "factcheck_with_final_labels.csv"

    # === Step 1: Take Claim Input ===
    claim = input("Enter the claim to fact-check:\n").strip()
    if not claim:
        print("❌ No claim provided.")
        return

    # Step 2: Run Fact-checking
    try:
        result = process_claim(claim)
    except Exception as e:
        print(f"⚠️ Error during fact-checking: {e}")
        result = {
            "claim": claim,
            "evidence": "ERROR",
            "url": "ERROR",
            "score": 0,
            "label": "Uncertain",
            "final_prompt": "ERROR",
            "attempts": 0,
            "score_trace": "[]"
        }

    fc_df = pd.DataFrame([result])
    print(f"\n🔍 Fact-Check Result:\n{fc_df[['claim', 'evidence', 'label', 'url', 'score']].to_string(index=False)}")

    # Step 3: Scrutinizer
    df = fc_df[[TEXT_FIELD, "url"]].dropna()
    df = df[df[TEXT_FIELD].str.len() > 10].reset_index(drop=True)

    print("📰 Downloading article...")
    df["article"] = [fetch_article(u) for u in tqdm(df["url"], desc="Fetching")]
    df["word_len"] = df["article"].str.split().str.len().fillna(0).astype(int)
    min_article_len = max(20, int(df["word_len"].quantile(0.25)))
    df["valid_article"] = df["word_len"] >= min_article_len
    print(f"📏 MIN_ARTICLE_LEN set to {min_article_len} words")
    print(f"✅ Valid articles: {df['valid_article'].sum()} / {len(df)}")

    print("🔎 Computing cosine similarity...")
    df["max_sim"] = 0.0
    for idx, row in tqdm(df.iterrows(), total=len(df), desc="Scoring"):
        if row["valid_article"]:
            sim = max_sent_cosine(model, row[TEXT_FIELD], row["article"])
            df.at[idx, "max_sim"] = sim

    df["p_value"] = 1.0
    df["match"] = False
    if df["valid_article"].sum() >= 2:
        print("⚠️ Only one claim provided, skipping null hypothesis test.")
    else:
        for idx in df[df["valid_article"]].index:
            df.at[idx, "p_value"] = 0.0
            df.at[idx, "match"] = True  # Assume match for demo

    df.to_csv(FINAL_OUTPUT_CSV, index=False)
    print(f"✅ Scrutinizer results saved to {FINAL_OUTPUT_CSV}")

    # Step 4: Merge and Final Label Assignment
    scr_df = df
    merged = pd.merge(fc_df, scr_df[[TEXT_FIELD, 'match']], on=TEXT_FIELD, how='inner')
    merged['final_label'] = merged.apply(decide_label, axis=1)
    merged.to_csv(FINAL_LABEL_OUTPUT, index=False)
    print(f"✅ Final result saved to '{FINAL_LABEL_OUTPUT}'")

    print("\n🎯 Final Label:", merged['final_label'].iloc[0])


In [14]:
if __name__ == "__main__":
    main()


🔍 Fact-Check Result:
                                 claim                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                            

Fetching: 100%|██████████| 1/1 [00:00<00:00,  1.18it/s]


📏 MIN_ARTICLE_LEN set to 532 words
✅ Valid articles: 1 / 1
🔎 Computing cosine similarity...


Scoring: 100%|██████████| 1/1 [00:00<00:00, 23.74it/s]

✅ Scrutinizer results saved to scrutinizer_results.csv
✅ Final result saved to 'factcheck_with_final_labels.csv'

🎯 Final Label: Strongly Refuted



