# Consistency Evaluation - Self Matching Analysis

This notebook performs a consistency evaluation of the InterpDetect project, checking:
1. **CS1**: Whether conclusions match the original recorded results
2. **CS2**: Whether implementation follows the plan

## Project Overview
The InterpDetect project implements a mechanistic interpretability-based hallucination detection method for RAG systems using:
- External Context Score (ECS) - measures attention to external context
- Parametric Knowledge Score (PKS) - measures FFN contribution via Jensen-Shannon divergence


In [None]:
import os
import json
import glob
import pickle
import collections
import numpy as np
import pandas as pd
from scipy.stats import pearsonr
from sklearn.model_selection import train_test_split
from sklearn.metrics import precision_recall_fscore_support, confusion_matrix
import torch

# Set working directory
os.chdir('/home/smallyan/eval_agent')
repo_path = '/net/scratch2/smallyan/InterpDetect_eval'

print(f"CUDA available: {torch.cuda.is_available()}")
if torch.cuda.is_available():
    print(f"CUDA device: {torch.cuda.get_device_name(0)}")

## Load Training Data

In [None]:
# Load training data
folder_path = os.path.join(repo_path, "datasets/train")
examples = []
json_files = glob.glob(os.path.join(folder_path, "*.json"))

for file_path in json_files:
    with open(file_path, "r") as f:
        data = json.load(f)
        examples.extend(data)

print(f"Loaded {len(examples)} examples from {len(json_files)} files")

## CS1: Results vs Conclusions Analysis

### Claim 1: ECS Correlation Analysis
**Plan states**: "All attention heads exhibit negative correlations; hallucinated responses utilize less external context than truthful ones."


In [None]:
# Separate ECS and PKS data by hallucination label
ecs_truthful = collections.defaultdict(list)
ecs_hallucinated = collections.defaultdict(list)
pks_truthful = collections.defaultdict(list)
pks_hallucinated = collections.defaultdict(list)

for example in examples:
    for score in example['scores']:
        if score['hallucination_label']==0:
            for k, v in score['prompt_attention_score'].items():
                ecs_truthful[k].append(v)
            for k, v in score['parameter_knowledge_scores'].items():
                pks_truthful[k].append(v)
        else:
            for k, v in score['prompt_attention_score'].items():
                ecs_hallucinated[k].append(v)
            for k, v in score['parameter_knowledge_scores'].items():
                pks_hallucinated[k].append(v)

print(f"Number of attention heads: {len(ecs_truthful)}")
print(f"Number of FFN layers: {len(pks_truthful)}")
print(f"Truthful spans: {len(list(ecs_truthful.values())[0])}")
print(f"Hallucinated spans: {len(list(ecs_hallucinated.values())[0])}")

In [None]:
# Compute ECS vs Hallucination correlation
def pearson_corr(attention_scores, hallucination_labels, inverse=False):
    scores = np.array(attention_scores, dtype=float)
    labels = np.array(hallucination_labels, dtype=int)
    
    if inverse:
        inverse_labels = 1 - labels
        r, p_value = pearsonr(scores, inverse_labels)
    else:
        r, p_value = pearsonr(scores, labels)
    
    return r, p_value

# Aggregate ECS data for correlation
ecs_lst = collections.defaultdict(list) 
ecs_label_lst = collections.defaultdict(list) 
for k, v in ecs_truthful.items():
    for a in v:
        ecs_lst[k].append(a)
        ecs_label_lst[k].append(0)
    for a in ecs_hallucinated[k]:
        ecs_lst[k].append(a)
        ecs_label_lst[k].append(1)

# Compute ECS correlations (ECS vs Hallucination directly)
ecs_pcc_direct = {}
for k, v in ecs_lst.items():
    r, p_val = pearson_corr(v, ecs_label_lst[k], inverse=False)
    ecs_pcc_direct[k] = r

# Count positive and negative correlations
positive_corr = sum(1 for v in ecs_pcc_direct.values() if v > 0)
negative_corr = sum(1 for v in ecs_pcc_direct.values() if v < 0)

print(f"ECS Correlation Analysis (ECS vs Hallucination Label):")
print(f"  Positive correlations: {positive_corr}")
print(f"  Negative correlations: {negative_corr}")
print(f"  Total heads: {len(ecs_pcc_direct)}")
print(f"\nClaim: 'All attention heads exhibit negative correlations'")
print(f"Result: {'MATCHES' if negative_corr == len(ecs_pcc_direct) else 'DOES NOT MATCH'}")

### Claim 2: PKS Correlation Analysis
**Plan states**: "Later-layer FFNs exhibit substantially higher PKS for hallucinated responses and are positively correlated with hallucinations."


In [None]:
# Compute PKS vs Hallucination correlation
pks_lst = collections.defaultdict(list) 
pks_label_lst = collections.defaultdict(list) 
for k, v in pks_truthful.items():
    for a in v:
        pks_lst[k].append(a)
        pks_label_lst[k].append(0)
    for a in pks_hallucinated[k]:
        pks_lst[k].append(a)
        pks_label_lst[k].append(1)

# Compute PKS correlations
pks_pcc = {}
for k, v in pks_lst.items():
    r, p_val = pearson_corr(v, pks_label_lst[k], inverse=False)
    pks_pcc[k] = r

# Sort by layer number
sorted_pks = sorted(pks_pcc.items(), key=lambda x: int(x[0].split('_')[1]))

# Compare early vs later layers
early_layers = [v for k, v in sorted_pks[:14]]  # layers 0-13
later_layers = [v for k, v in sorted_pks[14:]]  # layers 14-27

print(f"PKS Correlation Analysis:")
print(f"  Mean correlation - Early layers (0-13): {np.mean(early_layers):.4f}")
print(f"  Mean correlation - Later layers (14-27): {np.mean(later_layers):.4f}")
print(f"\nClaim: 'Later-layer FFNs exhibit higher PKS correlation with hallucinations'")
print(f"Result: {'MATCHES' if np.mean(later_layers) > np.mean(early_layers) else 'DOES NOT MATCH'}")

### Claim 3: Classifier Performance
**Plan states**: "SVC achieved highest validation F1 (76.60%) and was selected; XGBoost overfitted despite strong training performance."


In [None]:
# Prepare data for classifier evaluation
ATTENTION_COLS = list(examples[0]['scores'][0]['prompt_attention_score'].keys())
PARAMETER_COLS = list(examples[0]['scores'][0]['parameter_knowledge_scores'].keys())

data_dict = {
    "identifier": [],
    **{col: [] for col in ATTENTION_COLS},
    **{col: [] for col in PARAMETER_COLS},
    "hallucination_label": []
}

for i, resp in enumerate(examples):
    for j in range(len(resp["scores"])):
        data_dict["identifier"].append(f"response_{i}_item_{j}")
        for col in ATTENTION_COLS:
            data_dict[col].append(resp["scores"][j]['prompt_attention_score'][col])
        for col in PARAMETER_COLS:
            data_dict[col].append(resp["scores"][j]['parameter_knowledge_scores'][col])
        data_dict["hallucination_label"].append(resp["scores"][j]["hallucination_label"])

df = pd.DataFrame(data_dict)

# Balance and split
min_count = df['hallucination_label'].value_counts().min()
df_balanced = df.groupby('hallucination_label', group_keys=False).apply(
    lambda x: x.sample(min_count, random_state=42), include_groups=False
).reset_index(drop=True)
df_balanced['hallucination_label'] = df.groupby('hallucination_label', group_keys=False).apply(
    lambda x: x.sample(min_count, random_state=42)
)['hallucination_label'].values

train, val = train_test_split(df_balanced, test_size=0.1, random_state=42, stratify=df_balanced['hallucination_label'])
features = [col for col in df_balanced.columns if col not in ['identifier', 'hallucination_label']]

X_train, y_train = train[features], train["hallucination_label"]
X_val, y_val = val[features], val["hallucination_label"]

print(f"Train set: {len(X_train)} samples, Validation set: {len(X_val)} samples")

In [None]:
# Evaluate pre-trained models
import warnings
warnings.filterwarnings('ignore')

models_path = os.path.join(repo_path, "trained_models")
model_names = ["LR", "SVC", "RandomForest", "XGBoost"]
model_results = {}

for name in model_names:
    model_file = os.path.join(models_path, f"model_{name}_3000.pickle")
    with open(model_file, "rb") as f:
        model = pickle.load(f)
    
    y_pred = model.predict(X_val)
    precision, recall, f1, _ = precision_recall_fscore_support(y_val, y_pred, average='binary')
    model_results[name] = {'precision': precision, 'recall': recall, 'f1': f1}
    
print("Classifier Performance Comparison:")
print("="*50)
for name in model_names:
    print(f"{name}: F1 = {model_results[name]['f1']*100:.2f}%")
print("="*50)
print(f"\nClaim: 'SVC achieved highest validation F1 (76.60%)'")
print(f"Actual highest: {max(model_results.items(), key=lambda x: x[1]['f1'])[0]} with {max(model_results.values(), key=lambda x: x['f1'])['f1']*100:.2f}%")
print(f"Result: {'MATCHES' if max(model_results.items(), key=lambda x: x[1]['f1'])[0] == 'SVC' else 'DOES NOT MATCH'}")

### Claim 4 & 5: Detection Performance
**Plan states**: 
- Self-Evaluation: "Method achieved F1=74.68%"
- Proxy-Based: "Method achieved F1=75.36%"


In [None]:
# Load SVC model for response-level evaluation
svc_model_path = os.path.join(models_path, "model_SVC_3000.pickle")
with open(svc_model_path, "rb") as f:
    svc_model = pickle.load(f)

# Self-Evaluation (Qwen test data)
test_qwen_path = os.path.join(repo_path, "datasets/test/test_w_chunk_score_qwen06b.json")
with open(test_qwen_path, "r") as f:
    test_qwen = json.load(f)

test_qwen_dict = {"identifier": [], **{col: [] for col in ATTENTION_COLS}, 
                  **{col: [] for col in PARAMETER_COLS}, "hallucination_label": []}
for i, resp in enumerate(test_qwen):
    for j in range(len(resp["scores"])):
        test_qwen_dict["identifier"].append(f"response_{i}_item_{j}")
        for col in ATTENTION_COLS:
            test_qwen_dict[col].append(resp["scores"][j]['prompt_attention_score'][col])
        for col in PARAMETER_COLS:
            test_qwen_dict[col].append(resp["scores"][j]['parameter_knowledge_scores'][col])
        test_qwen_dict["hallucination_label"].append(resp["scores"][j]["hallucination_label"])

test_qwen_df = pd.DataFrame(test_qwen_dict)
test_qwen_df['pred'] = svc_model.predict(test_qwen_df[features])
test_qwen_df["response_id"] = test_qwen_df["identifier"].str.extract(r"(response_\d+)_item_\d+")
agg_qwen = test_qwen_df.groupby("response_id").agg({"pred": "max", "hallucination_label": "max"}).reset_index()

tn, fp, fn, tp = confusion_matrix(agg_qwen["hallucination_label"], agg_qwen["pred"]).ravel()
f1_self = 2 * (tp/(tp+fp)) * (tp/(tp+fn)) / ((tp/(tp+fp)) + (tp/(tp+fn)))
print(f"Self-Evaluation F1: {f1_self*100:.2f}% (Claimed: 74.68%)")
print(f"Result: {'MATCHES' if abs(f1_self*100 - 74.68) < 0.1 else 'DOES NOT MATCH'}")

In [None]:
# Proxy-Based Evaluation (GPT-4.1-mini test data)
test_gpt_path = os.path.join(repo_path, "datasets/test/test_w_chunk_score_gpt41mini.json")
with open(test_gpt_path, "r") as f:
    test_gpt = json.load(f)

test_gpt_dict = {"identifier": [], **{col: [] for col in ATTENTION_COLS}, 
                 **{col: [] for col in PARAMETER_COLS}, "hallucination_label": []}
for i, resp in enumerate(test_gpt):
    for j in range(len(resp["scores"])):
        test_gpt_dict["identifier"].append(f"response_{i}_item_{j}")
        for col in ATTENTION_COLS:
            test_gpt_dict[col].append(resp["scores"][j]['prompt_attention_score'][col])
        for col in PARAMETER_COLS:
            test_gpt_dict[col].append(resp["scores"][j]['parameter_knowledge_scores'][col])
        test_gpt_dict["hallucination_label"].append(resp["scores"][j]["hallucination_label"])

test_gpt_df = pd.DataFrame(test_gpt_dict)
test_gpt_df['pred'] = svc_model.predict(test_gpt_df[features])
test_gpt_df["response_id"] = test_gpt_df["identifier"].str.extract(r"(response_\d+)_item_\d+")
agg_gpt = test_gpt_df.groupby("response_id").agg({"pred": "max", "hallucination_label": "max"}).reset_index()

tn, fp, fn, tp = confusion_matrix(agg_gpt["hallucination_label"], agg_gpt["pred"]).ravel()
f1_proxy = 2 * (tp/(tp+fp)) * (tp/(tp+fn)) / ((tp/(tp+fp)) + (tp/(tp+fn)))
print(f"Proxy-Based Evaluation F1: {f1_proxy*100:.2f}% (Claimed: 75.36%)")
print(f"Result: {'MATCHES' if abs(f1_proxy*100 - 75.36) < 0.1 else 'DOES NOT MATCH'}")

## CS2: Plan vs Implementation Analysis

Verifying that all methodology steps from the plan are implemented in the code.


In [None]:
# Read implementation files
scripts_path = os.path.join(repo_path, 'scripts')
with open(os.path.join(scripts_path, 'compute_scores.py'), 'r') as f:
    compute_scores_content = f.read()
with open(os.path.join(scripts_path, 'classifier.py'), 'r') as f:
    classifier_content = f.read()
with open(os.path.join(scripts_path, 'predict.py'), 'r') as f:
    predict_content = f.read()

# Verify each plan step
cs2_checks = {
    "Step1_ECS": {
        "attention_weights": "outputs.attentions" in compute_scores_content,
        "cosine_similarity": "calculate_sentence_similarity" in compute_scores_content,
    },
    "Step2_PKS": {
        "jensen_shannon": "calculate_dist_2d" in compute_scores_content,
        "kl_divergence": "F.kl_div" in compute_scores_content,
    },
    "Step3_TransformerLens": {
        "hooked_transformer": "HookedTransformer" in compute_scores_content,
        "run_with_cache": "run_with_cache" in compute_scores_content,
    },
    "Step4_Classifiers": {
        "LR": "LogisticRegression" in classifier_content,
        "SVC": "SVC" in classifier_content,
        "RF": "RandomForestClassifier" in classifier_content,
        "XGB": "XGBClassifier" in classifier_content,
    },
    "Step5_Evaluation": {
        "response_level": "response_id" in predict_content,
        "test_data_qwen": os.path.exists(os.path.join(repo_path, "datasets/test/test_w_chunk_score_qwen06b.json")),
        "test_data_gpt": os.path.exists(os.path.join(repo_path, "datasets/test/test_w_chunk_score_gpt41mini.json")),
    }
}

print("CS2: Plan vs Implementation Verification")
print("="*50)
all_pass = True
for step, checks in cs2_checks.items():
    step_pass = all(checks.values())
    all_pass = all_pass and step_pass
    print(f"\n{step}: {'PASS' if step_pass else 'FAIL'}")
    for check, result in checks.items():
        print(f"  {'✓' if result else '✗'} {check}")

print(f"\n{'='*50}")
print(f"CS2 Overall: {'PASS' if all_pass else 'FAIL'}")

## Summary

### CS1: Results vs Conclusions


In [None]:
# Final CS1 Summary
cs1_results = {
    "ECS_Correlation": {
        "claim": "All attention heads exhibit negative correlations",
        "verified": negative_corr == len(ecs_pcc_direct),
        "details": f"All {len(ecs_pcc_direct)} heads show negative correlation"
    },
    "PKS_Correlation": {
        "claim": "Later-layer FFNs have higher positive correlation",
        "verified": np.mean(later_layers) > np.mean(early_layers),
        "details": f"Early: {np.mean(early_layers):.4f}, Later: {np.mean(later_layers):.4f}"
    },
    "Classifier_Selection": {
        "claim": "SVC achieved highest validation F1 (76.60%)",
        "verified": max(model_results.items(), key=lambda x: x[1]['f1'])[0] == 'SVC',
        "details": f"Best: {max(model_results.items(), key=lambda x: x[1]['f1'])[0]} with {max(model_results.values(), key=lambda x: x['f1'])['f1']*100:.2f}%"
    },
    "Self_Evaluation": {
        "claim": "Method achieved F1=74.68%",
        "verified": abs(f1_self*100 - 74.68) < 0.1,
        "details": f"Actual: {f1_self*100:.2f}%"
    },
    "Proxy_Evaluation": {
        "claim": "Method achieved F1=75.36%",
        "verified": abs(f1_proxy*100 - 75.36) < 0.1,
        "details": f"Actual: {f1_proxy*100:.2f}%"
    }
}

print("CS1: Results vs Conclusions")
print("="*60)
for key, result in cs1_results.items():
    status = "PASS" if result['verified'] else "FAIL"
    print(f"\n{key}: {status}")
    print(f"  Claim: {result['claim']}")
    print(f"  Details: {result['details']}")

cs1_pass = all(r['verified'] for r in cs1_results.values())
print(f"\n{'='*60}")
print(f"CS1 Overall: {'PASS' if cs1_pass else 'FAIL'}")

### Binary Checklist Summary

In [None]:
print("="*60)
print("BINARY CHECKLIST")
print("="*60)
print(f"\nCS1 (Results vs Conclusions): {'PASS' if cs1_pass else 'FAIL'}")
print(f"CS2 (Plan vs Implementation): {'PASS' if all_pass else 'FAIL'}")
print("="*60)

if not cs1_pass:
    print("\nCS1 FAIL Reason:")
    for key, result in cs1_results.items():
        if not result['verified']:
            print(f"  - {key}: {result['details']}")