# Displacement Consistency: High-Performance Hallucination Detection




## Setup

In [None]:
# Uncomment to install dependencies
# !pip install -q datasets sentence-transformers spacy numpy pandas matplotlib seaborn scipy scikit-learn tqdm

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from dataclasses import dataclass
from typing import Optional, List, Dict, Tuple
from tqdm.auto import tqdm
import warnings
warnings.filterwarnings('ignore')

import spacy
from sentence_transformers import SentenceTransformer
from scipy import stats
from sklearn.metrics import roc_auc_score, roc_curve

from sgi import (
    compute_sgi,
    load_halueval_qa,
    print_dataset_summary,
    compute_effect_size,
    compute_cohens_d,
    set_publication_style,
)

set_publication_style()
print('Setup complete.')

In [None]:
# Load spacy for sentence tokenization
try:
    nlp = spacy.load('en_core_web_sm')
except:
    import subprocess
    subprocess.run(['python', '-m', 'spacy', 'download', 'en_core_web_sm'])
    nlp = spacy.load('en_core_web_sm')
print('Spacy loaded.')

## Configuration

In [None]:
# Configuration
MODEL_NAME = 'all-mpnet-base-v2'
MAX_SAMPLES = 1000  # Reduce for faster execution; increase for paper reproduction

print(f'Configuration:')
print(f'  Embedding Model: {MODEL_NAME}')
print(f'  MAX_SAMPLES: {MAX_SAMPLES}')

## Displacement Metrics Dataclass

In [None]:
@dataclass
class DisplacementMetrics:
    """Context-anchored displacement metrics for hallucination detection."""

    # Primary metric: Displacement Consistency
    displacement_consistency: float

    # Expected Response Deviation
    expected_response_deviation: float

    # Displacement Magnitude
    displacement_magnitude: float

    # Tangent Space Residual
    tangent_space_residual: float

    # Local SGI (sentence-level)
    local_sgi: float

    # Additional context metrics
    mean_claim_distance: float
    max_claim_distance: float
    trajectory_efficiency: float
    num_claims: int

    def to_dict(self) -> Dict:
        return self.__dict__

## Core Displacement Functions

In [None]:
def extract_claims(text: str) -> List[str]:
    """Extract sentences as claims from text."""
    doc = nlp(text)
    claims = [sent.text.strip() for sent in doc.sents if len(sent.text.strip()) > 10]
    return claims if claims else [text[:200]]


def compute_displacement_consistency(context_emb: np.ndarray,
                                     claim_embeddings: List[np.ndarray]) -> float:
    """
    Compute displacement consistency from claim trajectories.

    For grounded responses: consecutive claims maintain consistent
    displacement direction from context.

    For hallucinations: displacement directions are inconsistent,
    wandering rather than progressing coherently.

    Returns:
        Mean correlation between consecutive displacement vectors.
        High (near 1) = consistent/grounded, Low (near 0) = inconsistent/hallucinated
    """
    if len(claim_embeddings) < 2:
        return 0.0

    # Compute displacements from context to each claim
    displacements = [claim - context_emb for claim in claim_embeddings]

    # Compute correlations between consecutive displacements
    correlations = []
    for i in range(len(displacements) - 1):
        d1 = displacements[i]
        d2 = displacements[i + 1]

        # Normalize
        norm1 = np.linalg.norm(d1)
        norm2 = np.linalg.norm(d2)

        if norm1 > 1e-8 and norm2 > 1e-8:
            d1_norm = d1 / norm1
            d2_norm = d2 / norm2
            correlations.append(np.dot(d1_norm, d2_norm))

    return np.mean(correlations) if correlations else 0.0


def compute_expected_response_deviation(question_emb: np.ndarray,
                                        context_emb: np.ndarray,
                                        response_emb: np.ndarray,
                                        reference_pairs: List[Tuple[np.ndarray, np.ndarray]]) -> float:
    """
    Compute deviation from expected response location.

    Uses k-NN of similar questions to predict where the response should be.
    Hallucinations deviate significantly from this prediction.
    """
    if not reference_pairs:
        return 0.0

    # Compute expected displacement based on reference pairs
    mean_displacement = np.mean([r - q for q, r in reference_pairs], axis=0)

    # Predicted response location
    predicted_response = question_emb + mean_displacement

    # Deviation from prediction
    deviation = np.linalg.norm(response_emb - predicted_response)

    return deviation


def compute_tangent_space_residual(context_emb: np.ndarray,
                                   response_emb: np.ndarray,
                                   reference_responses: List[np.ndarray]) -> float:
    """
    Compute residual after projection to local tangent space.

    Grounded responses lie close to the tangent space defined by
    similar grounded responses.
    """
    if len(reference_responses) < 2:
        return 0.0

    # Build local basis from reference displacements
    displacements = [r - context_emb for r in reference_responses]

    # Stack and compute SVD for principal directions
    D = np.vstack(displacements)
    try:
        U, S, Vt = np.linalg.svd(D, full_matrices=False)
        # Use top-k principal components
        k = min(3, len(S))
        basis = Vt[:k].T

        # Project response displacement onto basis
        response_disp = response_emb - context_emb
        projection = basis @ (basis.T @ response_disp)

        # Residual is the part not explained by the basis
        residual = np.linalg.norm(response_disp - projection)
        return residual
    except:
        return 0.0


def compute_local_sgi(question_emb: np.ndarray,
                      context_emb: np.ndarray,
                      claim_embeddings: List[np.ndarray]) -> float:
    """
    Compute sentence-level SGI (Local SGI / LSGI).

    Aggregates SGI scores across individual claims.
    """
    if not claim_embeddings:
        return 1.0

    sgi_scores = []
    for claim_emb in claim_embeddings:
        result = compute_sgi(question_emb, context_emb, claim_emb)
        sgi_scores.append(result.sgi)

    return np.mean(sgi_scores)

## Compute All Displacement Metrics

In [None]:
def compute_displacement_metrics(question: str,
                                  context: str,
                                  response: str,
                                  encoder: SentenceTransformer,
                                  reference_pairs: Optional[List[Tuple[np.ndarray, np.ndarray]]] = None,
                                  reference_responses: Optional[List[np.ndarray]] = None) -> Optional[DisplacementMetrics]:
    """
    Compute all displacement-based metrics for a single case.
    """
    # Extract claims from response
    claims = extract_claims(response)
    if not claims:
        return None

    # Encode all texts
    question_emb = encoder.encode(question)
    context_emb = encoder.encode(context)
    response_emb = encoder.encode(response)
    claim_embeddings = [encoder.encode(c) for c in claims]

    # 1. Displacement Consistency (DC)
    dc = compute_displacement_consistency(context_emb, claim_embeddings)

    # 2. Expected Response Deviation (ERD)
    erd = compute_expected_response_deviation(
        question_emb, context_emb, response_emb,
        reference_pairs or []
    )

    # 3. Displacement Magnitude
    displacement_magnitude = np.linalg.norm(response_emb - context_emb)

    # 4. Tangent Space Residual (TSR)
    tsr = compute_tangent_space_residual(
        context_emb, response_emb,
        reference_responses or []
    )

    # 5. Local SGI (LSGI)
    lsgi = compute_local_sgi(question_emb, context_emb, claim_embeddings)

    # Additional metrics
    claim_distances = [np.linalg.norm(c - context_emb) for c in claim_embeddings]

    # Trajectory efficiency (direct distance / path length)
    trajectory = [context_emb] + claim_embeddings
    path_length = sum(np.linalg.norm(trajectory[i+1] - trajectory[i])
                      for i in range(len(trajectory) - 1))
    direct_distance = np.linalg.norm(claim_embeddings[-1] - context_emb) if claim_embeddings else 0
    efficiency = direct_distance / (path_length + 1e-8)

    return DisplacementMetrics(
        displacement_consistency=dc,
        expected_response_deviation=erd,
        displacement_magnitude=displacement_magnitude,
        tangent_space_residual=tsr,
        local_sgi=lsgi,
        mean_claim_distance=np.mean(claim_distances),
        max_claim_distance=np.max(claim_distances),
        trajectory_efficiency=efficiency,
        num_claims=len(claims)
    )

## Load Data and Run Analysis

In [None]:
# Load data
cases = load_halueval_qa(max_samples=MAX_SAMPLES)
print_dataset_summary(cases, 'HaluEval QA')

# Initialize encoder
encoder = SentenceTransformer(MODEL_NAME)
print(f'\nEncoder loaded: {MODEL_NAME}')
print(f'Embedding dimension: {encoder.get_sentence_embedding_dimension()}')

In [None]:
# Build reference set from grounded samples (for ERD and TSR)
print('Building reference set from grounded samples...')
grounded_cases = [c for c in cases if c.is_grounded][:200]

reference_pairs = []
reference_responses = []

for case in tqdm(grounded_cases, desc='Building references'):
    try:
        q_emb = encoder.encode(case.question)
        r_emb = encoder.encode(case.response)
        reference_pairs.append((q_emb, r_emb))
        reference_responses.append(r_emb)
    except:
        continue

print(f'Reference set: {len(reference_pairs)} pairs')

In [None]:
# Compute displacement metrics for all cases
results = []

for case in tqdm(cases, desc='Computing displacement metrics'):
    try:
        metrics = compute_displacement_metrics(
            question=case.question,
            context=case.context,
            response=case.response,
            encoder=encoder,
            reference_pairs=reference_pairs,
            reference_responses=reference_responses
        )

        if metrics is None:
            continue

        result = metrics.to_dict()
        result['id'] = case.id
        result['is_grounded'] = case.is_grounded
        results.append(result)

    except Exception as e:
        continue

df = pd.DataFrame(results)
print(f'\nProcessed: {len(df)} samples')
print(f'  Grounded: {df["is_grounded"].sum()}')
print(f'  Hallucinated: {(~df["is_grounded"]).sum()}')

## Evaluate Metrics

In [None]:
# Define metrics to evaluate
METRICS = [
    ('displacement_consistency', False),  # Higher = more grounded (invert for AUROC)
    ('expected_response_deviation', True),  # Higher = more hallucinated
    ('displacement_magnitude', True),  # Higher = more hallucinated
    ('tangent_space_residual', True),  # Higher = more hallucinated
    ('local_sgi', True),  # Higher = more hallucinated
]

print('='*80)
print('DISPLACEMENT METRICS EVALUATION')
print('='*80)
print(f'\n{"Method":<30} | {"Grounded":>10} | {"Halluc":>10} | {"Cohen\'s d":>10} | {"AUROC":>8}')
print('-'*80)

metric_results = []

for metric_name, higher_is_hallucinated in METRICS:
    values = df[metric_name].values
    labels = df['is_grounded'].values

    result = compute_effect_size(values, labels, metric_name,
                                 positive_class_is_hallucinated=higher_is_hallucinated)

    print(f'{metric_name:<30} | {result.grounded_mean:>10.4f} | {result.hallucinated_mean:>10.4f} | '
          f'{result.cohens_d:>+10.3f} | {result.auroc:>8.4f}')

    metric_results.append({
        'Method': metric_name,
        'Grounded Mean': result.grounded_mean,
        'Hallucinated Mean': result.hallucinated_mean,
        "Cohen's d": result.cohens_d,
        'AUROC': result.auroc,
        'p-value': result.p_value,
    })

print('-'*80)

## Comparison with Pre-computed Results

Load and display the pre-computed results from the `results/` directory.

In [None]:
# Load pre-computed results
import os

precomputed_results = []
results_dir = '../results'

if os.path.exists(results_dir):
    for exp_dir in os.listdir(results_dir):
        csv_files = [f for f in os.listdir(os.path.join(results_dir, exp_dir))
                     if f.endswith('.csv') and 'geometric_methods_summary' in f]
        for csv_file in csv_files:
            try:
                df_precomputed = pd.read_csv(os.path.join(results_dir, exp_dir, csv_file))
                df_precomputed['Source'] = exp_dir
                precomputed_results.append(df_precomputed)
            except:
                pass

if precomputed_results:
    print('='*80)
    print('PRE-COMPUTED RESULTS FROM results/ DIRECTORY')
    print('='*80)

    combined_precomputed = pd.concat(precomputed_results, ignore_index=True)

    # Aggregate by method
    summary = combined_precomputed.groupby('Method').agg({
        'Cross-Domain AUROC': ['mean', 'std'],
        'Within-Domain AUROC': ['mean', 'std']
    }).round(4)

    print('\nMethod Performance Summary (across embedding models):')
    print(summary.to_string())
else:
    print('No pre-computed results found in results/ directory.')

## Visualizations

In [None]:
# ROC curves for all metrics
fig, axes = plt.subplots(2, 3, figsize=(15, 10))
axes = axes.flatten()

colors = plt.cm.Set2(np.linspace(0, 1, len(METRICS)))

for idx, ((metric_name, higher_is_hallucinated), color) in enumerate(zip(METRICS, colors)):
    ax = axes[idx]

    values = df[metric_name].values
    y_true = (~df['is_grounded']).astype(int).values

    if not higher_is_hallucinated:
        values = -values

    fpr, tpr, _ = roc_curve(y_true, values)
    auroc = roc_auc_score(y_true, values)

    ax.plot(fpr, tpr, color=color, linewidth=2, label=f'AUC = {auroc:.4f}')
    ax.plot([0, 1], [0, 1], 'k--', alpha=0.5)
    ax.set_xlabel('False Positive Rate')
    ax.set_ylabel('True Positive Rate')
    ax.set_title(metric_name.replace('_', ' ').title())
    ax.legend(loc='lower right')
    ax.grid(True, alpha=0.3)

# Hide unused subplot
axes[-1].axis('off')

fig.suptitle('ROC Curves for Displacement Metrics', fontsize=14, fontweight='bold')
plt.tight_layout()
plt.savefig('displacement_roc_curves.png', dpi=300, bbox_inches='tight')
plt.show()

In [None]:
# Distribution comparison
fig, axes = plt.subplots(2, 3, figsize=(15, 10))
axes = axes.flatten()

for idx, (metric_name, _) in enumerate(METRICS):
    ax = axes[idx]

    grounded = df[df['is_grounded']][metric_name]
    hallucinated = df[~df['is_grounded']][metric_name]

    ax.hist(grounded, bins=30, alpha=0.6, label='Grounded', color='#2ecc71', density=True)
    ax.hist(hallucinated, bins=30, alpha=0.6, label='Hallucinated', color='#e74c3c', density=True)

    ax.set_xlabel(metric_name.replace('_', ' ').title())
    ax.set_ylabel('Density')
    ax.legend()
    ax.grid(True, alpha=0.3)

axes[-1].axis('off')

fig.suptitle('Distribution of Displacement Metrics', fontsize=14, fontweight='bold')
plt.tight_layout()
plt.savefig('displacement_distributions.png', dpi=300, bbox_inches='tight')
plt.show()