In [5]:
from __future__ import annotations
import argparse
import json
import math
import os
from dataclasses import dataclass
from typing import Dict, List, Optional, Tuple


import numpy as np
import pandas as pd
from tqdm import tqdm

In [6]:
def safe_text(x: Optional[str]) -> str:
    return x if isinstance(x, str) else ""

In [7]:
def jaccard(a: str, b: str) -> float:
    sa = set(t.lower() for t in a.split())
    sb = set(t.lower() for t in b.split())
    if not sa and not sb:
        return 1.0
    if not sa or not sb:
        return 0.0
    return len(sa & sb) / len(sa | sb)

In [8]:
def rouge_l(hyp: str, ref: str) -> float:
    try:
        from rouge_score import rouge_scorer
        scorer = rouge_scorer.RougeScorer(['rougeL'], use_stemmer=True)
        score = scorer.score(ref, hyp)["rougeL"].fmeasure
        return float(score)
    except Exception:
        # fallback: use jaccard if rouge unavailable
        return jaccard(hyp, ref)

In [9]:
def minmax_scale(x: float, lo: float, hi: float) -> float:
    if hi <= lo:
        return 0.0
    return max(0.0, min(1.0, (x - lo) / (hi - lo)))

In [10]:
def choose_reference(src: str, tgt: str, ref_flag: str, hyp: str, sim_fn) -> Tuple[str, str]:
    """Return (chosen_reference, reference_type_used) based on ref flag.
    For 'either', we compute similarity to both and choose the more supportive one.
    """
    src = safe_text(src)
    tgt = safe_text(tgt)
    if ref_flag == "src":
        return src, "src"
    if ref_flag == "tgt":
        return tgt, "tgt"
    # either: compare both; pick the one closer to hyp
    s_src = sim_fn(hyp, src) if src else -1
    s_tgt = sim_fn(hyp, tgt) if tgt else -1
    if s_src >= s_tgt:
        return src, "src"
    else:
        return tgt, "tgt"

In [11]:
class Providers:
    def __init__(self, device: Optional[str] = None):
        self.device = device
        self._sbert = None
        self._nli_tok = None
        self._nli_model = None
        self._ppl_tok = None
        self._ppl_model = None
        self._lt = None
        self._nlp = None


    def sbert(self):
        if self._sbert is None:
            from sentence_transformers import SentenceTransformer
            self._sbert = SentenceTransformer("all-MiniLM-L6-v2")
        return self._sbert
    
    
    def nli(self):
        if self._nli_model is None:
            from transformers import AutoTokenizer, AutoModelForSequenceClassification
            self._nli_tok = AutoTokenizer.from_pretrained("roberta-large-mnli")
            self._nli_model = AutoModelForSequenceClassification.from_pretrained("roberta-large-mnli")
            if self.device:
                self._nli_model.to(self.device)
        return self._nli_tok, self._nli_model


    def ppl(self):
        if self._ppl_model is None:
            from transformers import AutoTokenizer, AutoModelForCausalLM
            self._ppl_tok = AutoTokenizer.from_pretrained("gpt2")
            self._ppl_model = AutoModelForCausalLM.from_pretrained("gpt2")
            if self.device:
                self._ppl_model.to(self.device)
        return self._ppl_tok, self._ppl_model


    def language_tool(self):
        if self._lt is None:
            import language_tool_python as ltp
            try:
                self._lt = ltp.LanguageTool("en-US")
            except Exception: 
                self._lt = None
        return self._lt


    def spacy(self):
        if self._nlp is None:
            import spacy
            try:
                self._nlp = spacy.load("en_core_web_sm")
            except Exception: 
                try:
                    import subprocess, sys
                    subprocess.run([sys.executable, "-m", "spacy", "download", "en_core_web_sm"], check=False)
                    self._nlp = spacy.load("en_core_web_sm")
                except Exception:
                    self._nlp = None
        return self._nlp

In [12]:
class SimilarityScorer:
    def __init__(self, providers: Providers):
        self.prov = providers
    
    
    def sim(self, a: str, b: str) -> float:
        a, b = safe_text(a), safe_text(b)
        if not a or not b:
            return 0.0
        try:
            model = self.prov.sbert()
            emb = model.encode([a, b], convert_to_tensor=True)
            # cosine similarity
            import torch
            sim = torch.nn.functional.cosine_similarity(emb[0:1], emb[1:2]).item()
            return float(sim)
        except Exception :
            return jaccard(a, b)

In [13]:
class NLIScorer:
    def __init__(self, providers: Providers):
        self.prov = providers
    
    
    def entailment_contradiction(self, premise: str, hypothesis: str) -> Tuple[float, float, float]:
        """Return probabilities: (entailment, neutral, contradiction)"""
        premise, hypothesis = safe_text(premise), safe_text(hypothesis)
        if not premise or not hypothesis:
            return 0.0, 1.0, 0.0
        try:
            tok, model = self.prov.nli()
            import torch
            batch = tok([premise], [hypothesis], return_tensors="pt", truncation=True)
            if self.prov.device:
                batch = {k: v.to(self.prov.device) for k, v in batch.items()}
            with torch.no_grad():
                logits = model(**batch).logits[0]
                probs = torch.softmax(logits, dim=-1).detach().cpu().numpy()
            # label order for roberta-large-mnli is: contradiction, neutral, entailment
            p_contra, p_neutral, p_entail = float(probs[0]), float(probs[1]), float(probs[2])
            return p_entail, p_neutral, p_contra
        except Exception:
            # Heuristic fallback using lexical cues
            jac = jaccard(premise, hypothesis)
            # crude mapping
            p_entail = jac
            p_contra = max(0.0, 1.0 - jac)
            p_neutral = 1.0 - max(p_entail, p_contra)
            return p_entail, p_neutral, p_contra

In [14]:
class LinguisticScorer:
    def __init__(self, providers: Providers, ppl_hi: float = 80.0):
        self.prov = providers
        self.ppl_hi = ppl_hi
    
    
    def perplexity(self, text: str) -> Optional[float]:
        text = safe_text(text)
        if not text:
            return None
        try:
            tok, model = self.prov.ppl()
            import torch
            enc = tok(text, return_tensors="pt")
            if self.prov.device:
                enc = {k: v.to(self.prov.device) for k, v in enc.items()}
            with torch.no_grad():
                outputs = model(**enc, labels=enc["input_ids"]) # LM loss
                loss = outputs.loss.item()
            ppl = math.exp(loss)
            return float(ppl)
        except Exception:
            return None


    def grammar_errors(self, text: str) -> Optional[int]:
        try:
            lt = self.prov.language_tool()
            if lt is None:
                return None
            matches = lt.check(text)
            return int(len(matches))
        except Exception:
            return None


    def score(self, hyp: str, ref: str) -> float:
        """Return linguistic hallucination score (0..1). Higher = worse language / drift."""
        # Low overlap and high perplexity / many grammar errors imply higher linguistic hallucination risk
        rl = rouge_l(hyp, ref) # 0..1, higher is better
        jac = jaccard(hyp, ref)
        ppl = self.perplexity(hyp)
        errs = self.grammar_errors(hyp)
        
        
        # Normalize to 0..1 risk components
        drift = 1.0 - max(rl, jac) # low overlap => higher drift
        ppl_risk = 0.0 if ppl is None else minmax_scale(ppl, lo=20.0, hi=self.ppl_hi)
        err_risk = 0.0 if errs is None else min(1.0, errs / 10.0) # cap at 10 errors
        
        
        # Weighted combination
        risk = 0.45 * drift + 0.35 * ppl_risk + 0.20 * err_risk
        return float(max(0.0, min(1.0, risk)))

In [15]:
class FactualScorer:
    def __init__(self, providers: Providers, wiki_check: bool = False):
        self.prov = providers
        self.wiki_check = wiki_check


    def entity_support_ratio(self, hyp: str, ref: str) -> Optional[float]:
        """Return ratio of HYP named entities that appear (string match) in REF.
        If no entities in hyp, return None (no factual claim detected).
        """
        nlp = self.prov.spacy()
        if nlp is None:
            return None
        hyp_doc = nlp(hyp)
        ref_low = ref.lower()
        ents = [e.text for e in hyp_doc.ents if e.label_ not in {"CARDINAL"}]
        if not ents:
            return None
        supported = 0
        for e in ents:
            if e.strip() and e.strip().lower() in ref_low:
                supported += 1
        return supported / max(1, len(ents))


    def score(self, hyp: str, ref: str, p_entail: float, p_contra: float) -> float:
        """Factual hallucination score (0..1). Higher = more likely factually wrong.
        Combines: low entity support, low entailment, high contradiction.
        """
        esr = self.entity_support_ratio(hyp, ref)
        # If we cannot compute ESR or there are no entities, rely on NLI only
        lack_support = 1.0 - esr if esr is not None else 0.5
        nli_risk = max(1.0 - p_entail, p_contra)
        risk = 0.55 * lack_support + 0.45 * nli_risk
        return float(max(0.0, min(1.0, risk)))

In [16]:
@dataclass
class Weights:
    w_ling: float = 0.20
    w_log: float = 0.35
    w_fact: float = 0.30
    w_ctx: float = 0.15
    
    
    def normalize(self):
        s = self.w_ling + self.w_log + self.w_fact + self.w_ctx
        if s <= 0:
            self.w_ling = self.w_log = self.w_fact = self.w_ctx = 0.25
            return
        self.w_ling /= s
        self.w_log /= s
        self.w_fact /= s
        self.w_ctx /= s

In [17]:
def score_row(row: Dict, prov: Providers, weights: Weights, sim_threshold: float, ppl_hi: float) -> Dict:
    sim_scorer = SimilarityScorer(prov)
    nli_scorer = NLIScorer(prov)
    ling_scorer = LinguisticScorer(prov, ppl_hi=ppl_hi)
    fact_scorer = FactualScorer(prov)
    ctx_scorer = ContextualScorer(sim_threshold=sim_threshold)


    hyp = safe_text(row.get("hyp"))
    src = safe_text(row.get("src"))
    tgt = safe_text(row.get("tgt"))
    ref_flag = safe_text(row.get("ref")) or "either"

 
    sim_fn = lambda a, b: sim_scorer.sim(a, b)
    ref_text, ref_used = choose_reference(src, tgt, ref_flag, hyp, sim_fn)


    sim = sim_fn(hyp, ref_text)
    p_entail, p_neutral, p_contra = nli_scorer.entailment_contradiction(ref_text, hyp)


    ling = ling_scorer.score(hyp, ref_text)
    logi = p_contra # logical hallucination proxied by contradiction prob
    fact = fact_scorer.score(hyp, ref_text, p_entail, p_contra)
    ctx = ctx_scorer.score(sim, p_entail, p_contra)


    weights.normalize()
    overall = (
        weights.w_ling * ling +
        weights.w_log * logi +
        weights.w_fact * fact +
        weights.w_ctx * ctx
    )
    label = "Hallucination" if overall >= 0.5 else "Not Hallucination"


    out = {
        "task": safe_text(row.get("task")),
        "ref_flag": ref_flag,
        "ref_used": ref_used,
        "sim": round(float(sim), 4),
        "p_entail": round(float(p_entail), 4),
        "p_neutral": round(float(p_neutral), 4),
        "p_contra": round(float(p_contra), 4),
        "linguistic_p": round(float(ling), 4),
        "logical_p": round(float(logi), 4),
        "factual_p": round(float(fact), 4),
        "contextual_p": round(float(ctx), 4),
        "hallucination_p": round(float(overall), 4),
        "pred_label": label,
    }
    # pass through ground truth if present (for evaluation)
    if "label" in row:
        out["gold_label"] = row.get("label")
    if "p(Hallucination)" in row:
        out["gold_p"] = row.get("p(Hallucination)")
    return out

In [18]:
def evaluate(df: pd.DataFrame) -> Dict[str, float]:
    from sklearn.metrics import accuracy_score, f1_score
    from scipy.stats import pearsonr, spearmanr
    
    
    if not {"gold_label", "pred_label"}.issubset(df.columns):
        return {}
    
    
    # Binary accuracy / F1
    y_true = (df["gold_label"] == "Hallucination").astype(int).values
    y_pred = (df["pred_label"] == "Hallucination").astype(int).values
    acc = float(accuracy_score(y_true, y_pred))
    f1 = float(f1_score(y_true, y_pred))
    
    
    # Correlation of probabilities
    pear, spear = None, None
    if "gold_p" in df.columns:
        try:
            pear = float(pearsonr(df["gold_p"].astype(float), df["hallucination_p"].astype(float))[0])
            spear = float(spearmanr(df["gold_p"].astype(float), df["hallucination_p"].astype(float))[0])
        except Exception:
            pass
    
    return {"accuracy": acc, "f1": f1, "pearson": pear, "spearman": spear}

In [19]:
def main():
    ap = argparse.ArgumentParser()
    ap.add_argument("--input", required=True, help="Path to SHROOM JSON list file")
    ap.add_argument("--output", required=True, help="Where to write scored CSV")
    ap.add_argument("--device", default=None, help="e.g., cuda:0 or cpu")
    ap.add_argument("--evaluate", action="store_true", help="If labels exist, compute metrics")


# knobs
    ap.add_argument("--w_ling", type=float, default=0.20)
    ap.add_argument("--w_log", type=float, default=0.35)
    ap.add_argument("--w_fact", type=float, default=0.30)
    ap.add_argument("--w_ctx", type=float, default=0.15)
    ap.add_argument("--sim_threshold", type=float, default=0.70)
    ap.add_argument("--ppl_hi", type=float, default=80.0)


    args = ap.parse_args()


    with open(args.input, "r", encoding="utf-8") as f:
        data = json.load(f)


    df_in = pd.DataFrame(data)


    prov = Providers(device=args.device)
    weights = Weights(args.w_ling, args.w_log, args.w_fact, args.w_ctx)


    rows = []
    for row in tqdm(df_in.to_dict(orient="records"), total=len(df_in)):
        try:
            out = score_row(row, prov, weights, sim_threshold=args.sim_threshold, ppl_hi=args.ppl_hi)
        except Exception as e:
            out = {
                "task": row.get("task"),
                "ref_flag": row.get("ref"),
                "ref_used": None,
                "sim": np.nan,
                "p_entail": np.nan,
                "p_neutral": np.nan,
                "p_contra": np.nan,
                "linguistic_p": np.nan,
                "logical_p": np.nan,
                "factual_p": np.nan,
                "contextual_p": np.nan,
                "hallucination_p": np.nan,
                "pred_label": "ERROR",
                "error": str(e),
            }
            if "label" in row:
                out["gold_label"] = row.get("label")
            if "p(Hallucination)" in row:
                out["gold_p"] = row.get("p(Hallucination)")
        rows.append(out)


    df_out = pd.DataFrame(rows)
    df_out.to_csv(args.output, index=False)


    if args.evaluate and ("gold_label" in df_out.columns):
        metrics = evaluate(df_out)
        if metrics:
            print("\nEvaluation:")
            for k, v in metrics.items():
                if v is not None:
                    print(f" {k}: {v:.4f}")
        else:
            print("No evaluation performed (labels not present).")




    if __name__ == "__main__":
        main()

In [20]:
import nltk, spacy

# NLTK resources (you already downloaded, but safe to run again)
nltk.download("punkt")
nltk.download("averaged_perceptron_tagger")

# Load spaCy model
nlp = spacy.load("en_core_web_sm")
print("spaCy is ready ✅")


[nltk_data] Downloading package punkt to C:\Users\Nitish
[nltk_data]     Jangra\AppData\Roaming\nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package averaged_perceptron_tagger to
[nltk_data]     C:\Users\Nitish Jangra\AppData\Roaming\nltk_data...
[nltk_data]   Package averaged_perceptron_tagger is already up-to-
[nltk_data]       date!


spaCy is ready ✅


In [22]:
import pandas as pd
import numpy as np
import nltk, spacy
from sklearn.metrics.pairwise import cosine_similarity
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM
from sentence_transformers import SentenceTransformer

# --- Setup NLTK + spaCy ---
nltk.download("punkt")
nltk.download("averaged_perceptron_tagger")

try:
    nlp = spacy.load("en_core_web_sm")
except OSError:
    import spacy.cli
    spacy.cli.download("en_core_web_sm")
    nlp = spacy.load("en_core_web_sm")

# --- Core class ---
class HallucinationDetector:
    def __init__(self, sim_threshold=0.75, ppl_hi=60.0,
                 w_ling=0.25, w_log=0.25, w_fact=0.25, w_ctx=0.25):
        self.sim_threshold = sim_threshold
        self.ppl_hi = ppl_hi
        self.weights = {
            "linguistic": w_ling,
            "logical": w_log,
            "factual": w_fact,
            "contextual": w_ctx,
        }
        # Load embeddings model
        self.embedder = SentenceTransformer("all-MiniLM-L6-v2")
        # (Optional) for perplexity / fluency scoring
        self.tokenizer = AutoTokenizer.from_pretrained("t5-small")
        self.lm = AutoModelForSeq2SeqLM.from_pretrained("t5-small")

    def linguistic_score(self, hyp, tgt):
        # basic edit-distance style (can replace w/ BLEU, chrF, etc.)
        hyp_tokens = nltk.word_tokenize(hyp)
        tgt_tokens = nltk.word_tokenize(tgt)
        overlap = len(set(hyp_tokens) & set(tgt_tokens))
        return overlap / (len(set(tgt_tokens)) + 1e-5)

    def logical_score(self, hyp, src):
        # use embeddings cosine similarity
        emb = self.embedder.encode([hyp, src])
        sim = cosine_similarity([emb[0]], [emb[1]])[0][0]
        return sim

    def factual_score(self, hyp, ref):
        # placeholder: treat similarity to reference as factual alignment
        emb = self.embedder.encode([hyp, ref])
        sim = cosine_similarity([emb[0]], [emb[1]])[0][0]
        return sim

    def contextual_score(self, hyp, src):
        # cheap syntax complexity proxy
        doc = nlp(hyp)
        depth = max([token.dep_ for token in doc], default="")  # just presence of deps
        return 1.0 if depth else 0.5

    def classify(self, row):
        hyp, src, tgt, ref = row["hyp"], row["src"], row["tgt"], row["ref"]

        ling = self.linguistic_score(hyp, tgt)
        logi = self.logical_score(hyp, src)
        fact = self.factual_score(hyp, ref if ref else src)
        ctx  = self.contextual_score(hyp, src)

        # weighted sum
        score = (
            ling * self.weights["linguistic"] +
            logi * self.weights["logical"] +
            fact * self.weights["factual"] +
            ctx  * self.weights["contextual"]
        )

        label = "Hallucination" if score < self.sim_threshold else "Not Hallucination"

        return pd.Series({
            "linguistic_score": ling,
            "logical_score": logi,
            "factual_score": fact,
            "contextual_score": ctx,
            "p(Hallucination)": float(1 - score),
            "label": label
        })

# --- Convenience wrapper ---
def run_pipeline(input_path, output_path=None, evaluate=False):
    df = pd.read_json(input_path)
    detector = HallucinationDetector()

    scores = df.apply(detector.classify, axis=1)
    df_out = pd.concat([df, scores], axis=1)

    if output_path:
        df_out.to_csv(output_path, index=False)
        print(f"[+] Results saved to {output_path}")

    return df_out


  from .autonotebook import tqdm as notebook_tqdm
[nltk_data] Downloading package punkt to C:\Users\Nitish
[nltk_data]     Jangra\AppData\Roaming\nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package averaged_perceptron_tagger to
[nltk_data]     C:\Users\Nitish Jangra\AppData\Roaming\nltk_data...
[nltk_data]   Package averaged_perceptron_tagger is already up-to-
[nltk_data]       date!


In [26]:
import nltk
nltk.download("punkt")
nltk.download("punkt_tab")


[nltk_data] Downloading package punkt to C:\Users\Nitish
[nltk_data]     Jangra\AppData\Roaming\nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package punkt_tab to C:\Users\Nitish
[nltk_data]     Jangra\AppData\Roaming\nltk_data...
[nltk_data]   Unzipping tokenizers\punkt_tab.zip.


True

In [27]:
df_train = run_pipeline("./data/train.model-agnostic.json", "./data/train_scored.csv")
df_train.head()

[+] Results saved to ./data/train_scored.csv


Unnamed: 0,hyp,tgt,src,ref,task,model,linguistic_score,logical_score,factual_score,contextual_score,p(Hallucination),label
0,"Don't worry, it's only temporary.",Don't worry. It's only temporary.,Не волнуйся. Это только временно.,either,MT,,0.874999,0.029491,0.184666,1.0,0.477711,Hallucination
1,Tom is never where he should be.,Tom is never where he's supposed to be.,"Тома никогда нет там, где он должен быть.",either,MT,,0.699999,-0.069661,0.152298,1.0,0.554341,Hallucination
2,It's hard for me to work with Tom.,I have trouble working with Tom.,Мне сложно работать с Томом.,either,MT,,0.428571,-0.120228,0.064242,1.0,0.656854,Hallucination
3,"Water, please.",I'd like some water.,"Воду, пожалуйста.",either,MT,,0.166666,0.109546,0.163692,1.0,0.640024,Hallucination
4,I didn't expect Tom to betray me.,I didn't think that Tom would betray me.,"Я не ожидал, что Том предаст меня.",either,MT,,0.699999,-0.051613,0.046931,1.0,0.576171,Hallucination


In [28]:
df_train

Unnamed: 0,hyp,tgt,src,ref,task,model,linguistic_score,logical_score,factual_score,contextual_score,p(Hallucination),label
0,"Don't worry, it's only temporary.",Don't worry. It's only temporary.,Не волнуйся. Это только временно.,either,MT,,0.874999,0.029491,0.184666,1.0,0.477711,Hallucination
1,Tom is never where he should be.,Tom is never where he's supposed to be.,"Тома никогда нет там, где он должен быть.",either,MT,,0.699999,-0.069661,0.152298,1.0,0.554341,Hallucination
2,It's hard for me to work with Tom.,I have trouble working with Tom.,Мне сложно работать с Томом.,either,MT,,0.428571,-0.120228,0.064242,1.0,0.656854,Hallucination
3,"Water, please.",I'd like some water.,"Воду, пожалуйста.",either,MT,,0.166666,0.109546,0.163692,1.0,0.640024,Hallucination
4,I didn't expect Tom to betray me.,I didn't think that Tom would betray me.,"Я не ожидал, что Том предаст меня.",either,MT,,0.699999,-0.051613,0.046931,1.0,0.576171,Hallucination
...,...,...,...,...,...,...,...,...,...,...,...,...
29995,"Yeah, I'm listening.",,"Yeah, I'm listening.",src,PG,,0.000000,1.000000,0.148736,1.0,0.462816,Hallucination
29996,Time?,,The time?,src,PG,,0.000000,0.838017,0.163360,1.0,0.499656,Hallucination
29997,Plague?,,A plague?,src,PG,,0.000000,0.921636,0.092533,1.0,0.496458,Hallucination
29998,"Tango, Tango.",,Tango.,src,PG,,0.000000,0.952118,0.179537,1.0,0.467086,Hallucination
