In [None]:
# # This Python 3 environment comes with many helpful analytics libraries installed
# # It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# # For example, here's several helpful packages to load

# import numpy as np # linear algebra
# import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# # Input data files are available in the read-only "../input/" directory
# # For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

# import os
# for dirname, _, filenames in os.walk('/kaggle/input'):
#     for filename in filenames:
#         print(os.path.join(dirname, filename))

# # You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# # You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [1]:
!nvidia-smi

Mon Nov 17 14:39:34 2025       
+---------------------------------------------------------------------------------------+
| NVIDIA-SMI 545.23.08              Driver Version: 545.23.08    CUDA Version: 12.3     |
|-----------------------------------------+----------------------+----------------------+
| GPU  Name                 Persistence-M | Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp   Perf          Pwr:Usage/Cap |         Memory-Usage | GPU-Util  Compute M. |
|                                         |                      |               MIG M. |
|   0  Tesla V100-SXM2-16GB           Off | 00000000:1D:00.0 Off |                    0 |
| N/A   29C    P0              41W / 300W |      0MiB / 16384MiB |      0%      Default |
|                                         |                      |                  N/A |
+-----------------------------------------+----------------------+----------------------+
                                                                    

Python PyTorch Pipeline for TDA-based Hallucination Detection
This pipeline utilizes CodeLlama-7B (a variant of Llama-7B specialized for code
), fetches attention maps using PyTorch/HuggingFace, extracts Topological Data Analysis (TDA) features, and classifies the result using LightGBM.
1. Setup and Dependencies (Conceptual)

In [None]:
import torch
import numpy as np
import lightgbm as lgb
from transformers import AutoModelForCausalLM, AutoTokenizer
from typing import List, Dict, Tuple, Any

# --- Conceptual TDA Library Imports ---
# NOTE: The actual implementation of these TDA tools (like calculating PH, 
# Cross-Barcodes, and MTD scores) requires specialized libraries (e.g., ripser, 
# gudhi, or custom code, which are outside the provided sources).
# We use placeholder functions below based on the algorithms described in the sources [3, 4, 7].

# Define the LLM Coder: CodeLlama-7B 
# (Based on Llama-2-7b, noted for its use in code hallucination research [8-10])
MODEL_NAME = "codellama/CodeLlama-7b-hf" 
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"

# Set a layer for feature extraction. Research suggests intermediate layers 
# often achieve near-optimal performance for attention scores [11]. 
# We target Layer 20, as an example within the commonly optimal range (19 to 23 for 32 layers) [11].
TARGET_LAYER = 20 

2. LLM Initialization and Attention Map Extraction (PyTorch)
This phase uses PyTorch via the HuggingFace library to load the Llama Coder model and perform a forward pass while recording the internal attention kernel maps

In [None]:
from accelerate import infer_auto_device_map, dispatch_model
import torch

def load_model_and_tokenizer_optimized(model_name: str):
    """Optimized loading for multi-GPU setup."""
    tokenizer = AutoTokenizer.from_pretrained(model_name)
    
    # Load model with optimized settings
    model = AutoModelForCausalLM.from_pretrained(
        model_name,
        torch_dtype=torch.float16,
        device_map="auto",
        low_cpu_mem_usage=True,
        attn_implementation="eager",  # Force eager attention for output_attentions
        trust_remote_code=True
    )
    
    # Enable attention output
    model.config.output_attentions = True
    
    return model, tokenizer

def extract_attention_features(model, tokenizer, prompt: str, code_output: str) -> np.ndarray:
    """
    Performs a forward pass and extracts attention maps from the target layer.
    
    The prompt and generated code are concatenated as input [5, 7].
    """
    
    # 1. Prepare input
    # In the context of code generation, input sequence x includes the prompt and the generation [5].
    full_sequence = prompt + code_output
    inputs = tokenizer(full_sequence, return_tensors="pt").to(DEVICE)
    
    # 2. Run forward pass
    with torch.no_grad():
        outputs = model(**inputs, output_attentions=True)

    # Attention maps are extracted from internal layers (white-box setting) [12, 14].
    # Attention: Tuple of (Layer x Batch x Head x Seq_Len x Seq_Len) tensors
    attentions = outputs.attentions 

    if TARGET_LAYER >= len(attentions):
        raise IndexError(f"Layer {TARGET_LAYER} out of bounds for model with {len(attentions)} layers.")

    # Select the specific attention map for the target layer (Layer L)
    # Shape: (Batch x Head x Seq_Len x Seq_Len)
    attn_map_layer_l = attentions[TARGET_LAYER].squeeze(0) 

    # We return the attention maps (kernel similarity maps) for TDA [15, 16].
    return attn_map_layer_l.cpu().numpy(), inputs.input_ids.size(1)

3. Topological Data Analysis (TDA) Feature Engineering
The TDA approach analyzes the topology of attention maps by treating the attention matrix as a fully-connected weighted graph where nodes are tokens
. We extract features based on the concepts of MTD and diagonal sums, which demonstrated relevance for code hallucination detection

In [None]:
def conceptual_tda_feature_extraction(attn_map_layer_l: np.ndarray, sequence_length: int, prompt_len: int) -> Dict[str, float]:
    """
    Conceptual function to derive TDA features (MTD, Barcodes, Diagonal Sums).
    
    We hypothesize shorter persistent barcodes for hallucinated code [User Request].
    MTD measures the multiscale topological distance between the generated code manifold 
    (G) and a reference manifold (P) [5, 18, 19].
    """
    
    num_heads = attn_map_layer_l.shape
    all_features = {}
    
    # Identify indices for prompt (P) and generated code (G)
    prompt_indices = range(prompt_len)
    gen_indices = range(prompt_len, sequence_length)
    
    for head_idx in range(num_heads):
        # The attention matrix A is obtained for the head [5]
        A = attn_map_layer_l[head_idx] 
        
        # 1. Attention Graph Construction & Symmetrization [4, 5]
        # In TDA, the weighted adjacency graph is often derived using 
        # wi,j = 1 - max(ai,j, aj,i) for symmetrization [5].
        # For this conceptual example, we simulate the input data needed for PH/MTD.
        
        # 2. Extract Diagonal Attention Features (Diagonal elements sum) [6, 17]
        # Diagonal elements (self-attention) are important features [6].
        
        if prompt_len > 0:
            # Sum of diagonal values of attention matrix corresponding to prompt (P) [6]
            diag_P_sum = np.sum(A[prompt_indices, prompt_indices]) / prompt_len 
            all_features[f'diag_P_sum_h{head_idx}'] = diag_P_sum

        if len(gen_indices) > 0:
            # Sum of diagonal values of attention matrix corresponding to generation (G) [6]
            diag_G_sum = np.sum(A[gen_indices, gen_indices]) / len(gen_indices)
            all_features[f'diag_G_sum_h{head_idx}'] = diag_G_sum

        # 3. Conceptual MTD/Cross-Barcode Features (Topological Features)
        # MTD0(P,G), MTD1(P,G), MTD0(G,P), MTD1(G,P) are crucial features [6].
        
        # MTD0 (0-dimensional homology, related to connected components/persistence length of bars)
        # Assuming MTD calculation based on persistent homology (PH) [4]
        
        # For a hallucinated code (G), the predicted topological stability is often hypothesized 
        # to be lower or deviate significantly from truthful code (P) [18, 20].

        # Placeholder values representing normalized MTD scores [6]:
        mtd0_pg = np.random.rand() # MTD0(P, G) / |G|
        mtd1_pg = np.random.rand() # MTD1(P, G) / |G|
        mtd0_gp = np.random.rand() # MTD0(G, P) / |P|
        mtd1_gp = np.random.rand() # MTD1(G, P) / |P|

        all_features[f'mtd0_pg_h{head_idx}'] = mtd0_pg
        all_features[f'mtd1_pg_h{head_idx}'] = mtd1_pg
        all_features[f'mtd0_gp_h{head_idx}'] = mtd0_gp
        all_features[f'mtd1_gp_h{head_idx}'] = mtd1_gp

    # In a real implementation, normalization (e.g., MinMax normalization) would be applied [21].
    return all_features

4. Data Simulation and Feature Aggregation
We simulate a dataset structure where features are collected across multiple samples. The target LLM is CodeLlama-7B
. Benchmarks like HumanEval or CodeHaluEval provide the necessary (prompt - code) pairs labeled for correctness

In [None]:
!nvidia-smi

In [None]:
from datasets import load_dataset

import random
from datasets import load_dataset

def load_balanced_code_dataset(num_samples=100):
    """Load HumanEval dataset and create balanced dataset with hallucinations."""
    try:
        dataset = load_dataset("openai_humaneval", split="test")
        
        balanced_data = []
        
        for i, example in enumerate(dataset):
            if i >= num_samples:
                break
                
            prompt = example["prompt"]
            correct_code = example["canonical_solution"]
            
            # Add correct example (label 0)
            balanced_data.append((prompt, correct_code, 0))
            
            # Create incorrect/hallucinated version (label 1)
            incorrect_code = introduce_hallucination(correct_code)
            balanced_data.append((prompt, incorrect_code, 1))
            
        return balanced_data
        
    except Exception as e:
        print(f"Could not load real dataset: {e}")
        return None

def introduce_hallucination(code: str) -> str:
    """Introduce common coding errors to create hallucinated examples."""
    lines = code.split('\n')
    
    if len(lines) <= 1:
        return code  # Can't modify single-line functions much
    
    # Choose a random mutation strategy
    strategy = random.choice([
        "off_by_one", "wrong_operator", "missing_condition", 
        "wrong_variable", "infinite_loop", "type_error"
    ])
    
    if strategy == "off_by_one":
        # Common off-by-one errors
        modified_lines = []
        for line in lines:
            if 'range(' in line and 'len(' in line:
                line = line.replace('range(len(', 'range(len(')  # No change, need specific patterns
                # More specific off-by-one
                if 'range(len(' in line:
                    line = line.replace('range(len(', 'range(1, len(')
                elif 'range(0, len(' in line:
                    line = line.replace('range(0, len(', 'range(0, len(')  # No change
                else:
                    # Add +1 or -1 randomly
                    if random.random() > 0.5:
                        line = line.replace('len(', 'len(')  # Placeholder for actual modification
            modified_lines.append(line)
        return '\n'.join(modified_lines)
    
    elif strategy == "wrong_operator":
        # Replace operators with incorrect ones
        replacements = [
            ('+', '-'), ('-', '+'), ('*', '/'), ('/', '*'),
            ('<', '>'), ('>', '<'), ('<=', '>='), ('>=', '<='),
            ('==', '!='), ('!=', '==')
        ]
        modified_code = code
        for old, new in replacements:
            if old in modified_code and random.random() > 0.7:
                modified_code = modified_code.replace(old, new, 1)
                break
        return modified_code
    
    elif strategy == "missing_condition":
        # Remove or break conditions
        if 'if ' in code:
            lines = code.split('\n')
            for i, line in enumerate(lines):
                if 'if ' in line and ':' in line:
                    # Change condition to always True or False
                    if random.random() > 0.5:
                        lines[i] = line.replace('if ', 'if True and ')  # Always true
                    else:
                        lines[i] = line.replace('if ', 'if False or ')  # Always false
                    break
            return '\n'.join(lines)
        return code
    
    elif strategy == "wrong_variable":
        # Use wrong variable names
        variables = ['x', 'y', 'z', 'i', 'j', 'k', 'n', 'num', 'val', 'temp']
        if variables:
            wrong_var = random.choice(variables)
            # Find a variable to replace (simple heuristic)
            for var in variables:
                if var in code and var != wrong_var:
                    return code.replace(var, wrong_var)
        return code
    
    elif strategy == "type_error":
        # Introduce type errors
        if 'int(' in code:
            return code.replace('int(', 'str(', 1)
        elif 'str(' in code:
            return code.replace('str(', 'int(', 1)
        elif 'float(' in code:
            return code.replace('float(', 'str(', 1)
        return code
    
    else:  # infinite_loop
        # Change loop conditions to create potential infinite loops
        lines = code.split('\n')
        for i, line in enumerate(lines):
            if 'while ' in line and ':' in line:
                lines[i] = line.replace('while ', 'while True and ')
                break
            elif 'for ' in line and ' in range(' in line:
                lines[i] = line.replace(' in range(', ' in range(1000000)  # Large range')
                break
        return '\n'.join(lines)

5. LightGBM Classification
We use LightGBM (LGBM) to classify the code as hallucinated or not, based on the extracted TDA features [User Request]. Gradient boosting methods like LGBM and XGBoost were successfully used for this feature set in similar research

In [None]:
def train_and_evaluate_classifier(X: np.ndarray, Y: np.ndarray):
    """
    Trains and evaluates a classifier for code hallucination detection.
    """
    print("--- Training Classifier ---")
    print(f"Dataset shape: X {X.shape}, Y {Y.shape}")
    print(f"Class distribution: {np.bincount(Y)}")
    
    from sklearn.model_selection import train_test_split, cross_val_score
    from sklearn.metrics import accuracy_score, f1_score, roc_auc_score, classification_report
    from sklearn.ensemble import RandomForestClassifier
    import lightgbm as lgb
    
    # Split data
    X_train, X_test, Y_train, Y_test = train_test_split(
        X, Y, test_size=0.3, random_state=42, stratify=Y
    )
    
    print(f"Training set: {X_train.shape}, Test set: {X_test.shape}")
    
    # Try multiple classifiers
    classifiers = {
        'LightGBM': lgb.LGBMClassifier(
            objective='binary', 
            metric='binary_logloss', 
            random_state=42,
            n_estimators=100,
            learning_rate=0.1
        ),
        'RandomForest': RandomForestClassifier(
            n_estimators=100, 
            random_state=42,
            max_depth=10
        )
    }
    
    best_score = 0
    best_clf = None
    best_name = ""
    
    for name, clf in classifiers.items():
        print(f"\n--- Training {name} ---")
        
        # Train classifier
        clf.fit(X_train, Y_train)
        
        # Predictions
        Y_pred = clf.predict(X_test)
        Y_proba = clf.predict_proba(X_test)[:, 1]
        
        # Evaluation
        acc = accuracy_score(Y_test, Y_pred)
        f1 = f1_score(Y_test, Y_pred)
        roc_auc = roc_auc_score(Y_test, Y_proba)
        
        print(f"{name} Results:")
        print(f"  Accuracy: {acc:.4f}")
        print(f"  F1-Score: {f1:.4f}")
        print(f"  ROC-AUC: {roc_auc:.4f}")
        
        # Cross-validation
        cv_scores = cross_val_score(clf, X, Y, cv=5, scoring='f1')
        print(f"  Cross-val F1: {cv_scores.mean():.4f} (+/- {cv_scores.std() * 2:.4f})")
        
        if roc_auc > best_score:
            best_score = roc_auc
            best_clf = clf
            best_name = name
    
    print(f"\n--- Best Classifier: {best_name} with ROC-AUC {best_score:.4f} ---")
    
    # Detailed evaluation for best classifier
    Y_pred_best = best_clf.predict(X_test)
    Y_proba_best = best_clf.predict_proba(X_test)[:, 1]
    
    print("\nDetailed Classification Report:")
    print(classification_report(Y_test, Y_pred_best, target_names=['Correct', 'Hallucination']))
    
    # Feature importance
    if hasattr(best_clf, 'feature_importances_'):
        print("\nTop 10 Most Important Features:")
        feature_importances = best_clf.feature_importances_
        top_indices = np.argsort(feature_importances)[::-1][:10]
        
        # If you have feature names, use them. Otherwise use indices.
        feature_names = [f"Feature_{i}" for i in range(len(feature_importances))]
        
        for i, idx in enumerate(top_indices):
            print(f"  {i+1:2d}. {feature_names[idx]}: {feature_importances[idx]:.4f}")
    
    return best_clf, best_name

In [None]:
# 1. LLM and Dataset Selection
print(f"Initializing LLM Coder: {MODEL_NAME} (Llama 7B variant) on {DEVICE}")
model, tokenizer = load_model_and_tokenizer_optimized(MODEL_NAME)

In [None]:
model

In [None]:
def debug_extract_attention_features(model, tokenizer, prompt, code):
    """Debug version to identify the tuple error."""
    print("DEBUG: Starting extract_attention_features")
    
    try:
        # Combine prompt and code
        full_text = prompt + "\n" + code
        print(f"DEBUG: Full text length: {len(full_text)} chars")
        
        # Tokenize
        print("DEBUG: Tokenizing...")
        inputs = tokenizer(full_text, return_tensors="pt", truncation=True, max_length=2048)
        inputs = {k: v.to(model.device) for k, v in inputs.items()}
        
        print(f"DEBUG: Input shapes - input_ids: {inputs['input_ids'].shape}, attention_mask: {inputs['attention_mask'].shape}")
        
        # Forward pass
        print("DEBUG: Running model forward pass...")
        with torch.no_grad():
            outputs = model(**inputs, output_attentions=True)
        
        print("DEBUG: Model forward pass completed")
        
        # Get attention maps
        attentions = outputs.attentions
        print(f"DEBUG: Type of attentions: {type(attentions)}")
        
        if attentions is None:
            print("DEBUG: No attention maps in output")
            return None, 0
        
        print(f"DEBUG: Number of attention layers: {len(attentions)}")
        
        # Process attention maps
        attn_maps = []
        for i, layer_attn in enumerate(attentions):
            print(f"DEBUG: Layer {i} attention shape: {layer_attn.shape}")
            
            # Check if layer_attn is a tuple
            if isinstance(layer_attn, tuple):
                print(f"DEBUG: Layer {i} attention is tuple with {len(layer_attn)} elements")
                # Take the first element if it's a tuple
                layer_attn = layer_attn[0]
                print(f"DEBUG: After taking first element: {layer_attn.shape}")
            
            # Average over attention heads
            layer_mean = layer_attn.mean(dim=1)  # (batch, seq_len, seq_len)
            print(f"DEBUG: After mean over heads: {layer_mean.shape}")
            
            # Remove batch dimension
            layer_mean = layer_mean.squeeze(0)  # (seq_len, seq_len)
            print(f"DEBUG: After squeeze: {layer_mean.shape}")
            
            attn_maps.append(layer_mean.cpu().numpy())
        
        seq_len = inputs['input_ids'].shape[1]
        print(f"DEBUG: Final seq_len: {seq_len}")
        print(f"DEBUG: Number of attention maps: {len(attn_maps)}")
        
        return attn_maps, seq_len
        
    except Exception as e:
        print(f"DEBUG: Error in extract_attention_features: {e}")
        import traceback
        traceback.print_exc()
        return None, 0
def extract_attention_features(model, tokenizer, prompt, code):
    """
    Extract attention maps from the model.
    Returns: attention_maps (list of numpy arrays), sequence_length (int)
    """
    try:
        # Combine prompt and code
        full_text = prompt + "\n" + code
        
        # Tokenize
        inputs = tokenizer(full_text, return_tensors="pt", truncation=True, max_length=2048)
        inputs = {k: v.to(model.device) for k, v in inputs.items()}
        
        # Get sequence length before forward pass
        seq_len = inputs['input_ids'].shape[1]
        
        # Forward pass with attention output
        with torch.no_grad():
            outputs = model(**inputs, output_attentions=True)
        
        # Get attention maps - handle different model output formats
        attentions = outputs.attentions
        
        if attentions is None:
            print("Warning: No attention maps in model output")
            return None, seq_len
        
        attn_maps = []
        for layer_idx, layer_attention in enumerate(attentions):
            # Handle tuple format (some models return tuples)
            if isinstance(layer_attention, tuple):
                # Usually the first element is the attention tensor
                layer_attention = layer_attention[0]
            
            # layer_attention shape: (batch_size, num_heads, seq_len, seq_len)
            if len(layer_attention.shape) == 4:
                # Average over attention heads: (batch_size, seq_len, seq_len)
                layer_mean = layer_attention.mean(dim=1)
                
                # Remove batch dimension: (seq_len, seq_len)
                if layer_mean.shape[0] == 1:
                    layer_mean = layer_mean.squeeze(0)
                
                attn_maps.append(layer_mean.cpu().numpy())
            else:
                print(f"Warning: Unexpected attention shape {layer_attention.shape} at layer {layer_idx}")
                continue
        
        if len(attn_maps) == 0:
            print("Warning: No valid attention maps extracted")
            return None, seq_len
            
        return attn_maps, seq_len
        
    except Exception as e:
        print(f"Error in extract_attention_features: {e}")
        return None, 0
def test_single_sample():
    """Test the pipeline with a single sample to identify issues."""
    print("=== TESTING SINGLE SAMPLE ===")
    
    # Load one sample
    balanced_data = load_balanced_code_dataset(num_samples=1)
    if not balanced_data:
        print("Failed to load sample")
        return
    
    prompt, code, label = balanced_data[0]
    print(f"Prompt: {prompt[:100]}...")
    print(f"Code: {code[:100]}...")
    print(f"Label: {label}")
    
    # Test attention extraction
    print("\n--- Testing Attention Extraction ---")
    attn_maps, seq_len = debug_extract_attention_features(model, tokenizer, prompt, code)
    
    if attn_maps is None:
        print("❌ Attention extraction failed")
        return
    
    print(f"✅ Attention extraction successful: {len(attn_maps)} layers, seq_len: {seq_len}")
    
    # Test TDA feature extraction
    print("\n--- Testing TDA Feature Extraction ---")
    try:
        prompt_inputs = tokenizer(prompt, return_tensors="pt")
        prompt_token_len = prompt_inputs.input_ids.size(1)
        
        features = conceptual_tda_feature_extraction(attn_maps, seq_len, prompt_token_len)
        
        if features and len(features) > 0:
            print(f"✅ TDA feature extraction successful: {len(features)} features")
            print(f"Feature names: {list(features.keys())}")
            print(f"Feature values: {list(features.values())}")
        else:
            print("❌ TDA feature extraction returned no features")
            
    except Exception as e:
        print(f"❌ TDA feature extraction failed: {e}")
        import traceback
        traceback.print_exc()

# Run the test
test_single_sample()
def conceptual_tda_feature_extraction(attn_maps, seq_len, prompt_token_len):
    """
    Extract topological features from attention maps.
    Returns: dict of feature names to values
    """
    try:
        features = {}
        
        if attn_maps is None or len(attn_maps) == 0:
            return features
        
        # Basic statistics from attention maps
        for layer_idx, attn_map in enumerate(attn_maps):
            if attn_map is None:
                continue
                
            # Ensure attn_map is 2D
            if len(attn_map.shape) != 2:
                print(f"Warning: Attention map at layer {layer_idx} has shape {attn_map.shape}, expected 2D")
                continue
            
            # Basic statistics
            features[f'layer_{layer_idx}_mean'] = np.mean(attn_map)
            features[f'layer_{layer_idx}_std'] = np.std(attn_map)
            features[f'layer_{layer_idx}_max'] = np.max(attn_map)
            features[f'layer_{layer_idx}_min'] = np.min(attn_map)
            
            # Prompt-to-code attention ratio (simplified)
            if prompt_token_len < seq_len:
                prompt_attn = attn_map[:prompt_token_len, :prompt_token_len]
                code_attn = attn_map[prompt_token_len:, prompt_token_len:]
                
                if np.sum(prompt_attn) > 0 and np.sum(code_attn) > 0:
                    features[f'layer_{layer_idx}_prompt_code_ratio'] = (
                        np.mean(prompt_attn) / np.mean(code_attn)
                    )
        
        # Cross-layer statistics
        if len(attn_maps) > 1:
            all_means = [np.mean(attn_map) for attn_map in attn_maps if attn_map is not None]
            if all_means:
                features['cross_layer_mean'] = np.mean(all_means)
                features['cross_layer_std'] = np.std(all_means)
        
        print(f"Extracted {len(features)} TDA features")
        return features
        
    except Exception as e:
        print(f"Error in TDA feature extraction: {e}")
        return {}
def run_complete_training_pipeline_fixed(num_samples=5):
    """Fixed pipeline with better error handling."""
    print("=== CODE HALLUCINATION DETECTION TRAINING PIPELINE (FIXED) ===")
    
    # 1. Load balanced dataset
    print("\n1. Loading balanced dataset...")
    balanced_data = load_balanced_code_dataset(num_samples)
    
    if balanced_data is None:
        print("Failed to load dataset.")
        return None, None, None
        
    print(f"Loaded {len(balanced_data)} samples")
    
    # 2. Test with one sample first
    print("\n2. Testing with single sample...")
    test_single_sample()
    
    # 3. Extract features from all samples
    print("\n3. Extracting features from all samples...")
    feature_list = []
    labels = []
    successful_samples = 0
    
    for i, (prompt, code, label) in enumerate(balanced_data):
        try:
            print(f"Processing sample {i+1}/{len(balanced_data)} - {'Correct' if label == 0 else 'Hallucination'}")
            
            # Get prompt token length
            prompt_inputs = tokenizer(prompt, return_tensors="pt")
            prompt_token_len = prompt_inputs.input_ids.size(1)
            
            # Extract attention features using the fixed version
            attn_maps, seq_len = extract_attention_features(model, tokenizer, prompt, code)
            
            if attn_maps is None:
                print(f"  ⚠️  No attention maps - skipping")
                continue
                
            if len(attn_maps) == 0:
                print(f"  ⚠️  Empty attention maps - skipping")
                continue
                
            # Extract TDA features
            features = conceptual_tda_feature_extraction(attn_maps, seq_len, prompt_token_len)
            
            if features and len(features) > 0:
                feature_list.append(list(features.values()))
                labels.append(label)
                successful_samples += 1
                print(f"  ✅ Extracted {len(features)} features")
            else:
                print(f"  ⚠️  No features extracted - skipping")
                
        except Exception as e:
            print(f"  ❌ Error: {str(e)}")
            continue
    
    if successful_samples == 0:
        print("❌ No successful feature extractions!")
        return None, None, None
        
    X = np.array(feature_list)
    Y = np.array(labels)
    
    print(f"\n✅ Successfully processed {successful_samples}/{len(balanced_data)} samples")
    print(f"Final dataset: X {X.shape}, Y {Y.shape}")
    
    # 4. Train classifier
    if len(X) >= 10:  # Need minimum samples for training
        print(f"\n4. Training classifier...")
        classifier, classifier_name = train_and_evaluate_classifier(X, Y)
        return classifier, X, Y
    else:
        print(f"❌ Not enough samples for training (need at least 10, got {len(X)})")
        return None, X, Y

# Run the fixed pipeline
classifier, X, Y = run_complete_training_pipeline_fixed(num_samples=10)


In [None]:
# Minimal test - just check if model works
def minimal_test():
    """Minimal test to check if the model is working."""
    test_prompt = "def hello_world():"
    test_code = "    return 'Hello, World!'"
    
    print("=== MINIMAL TEST ===")
    
    # Test tokenization
    try:
        inputs = tokenizer(test_prompt + "\n" + test_code, return_tensors="pt")
        print(f"✅ Tokenization works: {inputs['input_ids'].shape}")
    except Exception as e:
        print(f"❌ Tokenization failed: {e}")
        return
    
    # Test model inference
    try:
        with torch.no_grad():
            outputs = model(**inputs, output_attentions=True)
        print(f"✅ Model inference works")
        print(f"Attentions type: {type(outputs.attentions)}")
        if outputs.attentions:
            print(f"Number of attention layers: {len(outputs.attentions)}")
            print(f"First layer shape: {outputs.attentions[0].shape}")
    except Exception as e:
        print(f"❌ Model inference failed: {e}")
        return

minimal_test()

In [None]:
train_and_evaluate_lgbm(X[0], X[1])

# New methods 

In [None]:
!pip install human_eval ripser

In [None]:
from accelerate import infer_auto_device_map, dispatch_model
import torch
import torch
import numpy as np
import lightgbm as lgb
from transformers import AutoModelForCausalLM, AutoTokenizer
from typing import List, Dict, Tuple, Any
#!/usr/bin/env python
import os
import json
import numpy as np
import torch
import joblib
from tqdm import tqdm
from transformers import AutoTokenizer, AutoModelForCausalLM, set_seed
from human_eval.data import read_problems, write_jsonl
from human_eval.execution import check_correctness
import lightgbm as lgb

# --- Conceptual TDA Library Imports ---
# NOTE: The actual implementation of these TDA tools (like calculating PH, 
# Cross-Barcodes, and MTD scores) requires specialized libraries (e.g., ripser, 
# gudhi, or custom code, which are outside the provided sources).
# We use placeholder functions below based on the algorithms described in the sources [3, 4, 7].

# Define the LLM Coder: CodeLlama-7B 
# (Based on Llama-2-7b, noted for its use in code hallucination research [8-10])
MODEL_NAME = "codellama/CodeLlama-7b-hf" 
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"

# Set a layer for feature extraction. Research suggests intermediate layers 
# often achieve near-optimal performance for attention scores [11]. 
# We target Layer 20, as an example within the commonly optimal range (19 to 23 for 32 layers) [11].
TARGET_LAYER = 20 
def load_model_and_tokenizer_optimized(model_name: str):
    """Optimized loading for multi-GPU setup."""
    tokenizer = AutoTokenizer.from_pretrained(model_name)
    
    # Load model with optimized settings
    model = AutoModelForCausalLM.from_pretrained(
        model_name,
        torch_dtype=torch.float16,
        device_map="auto",
        low_cpu_mem_usage=True,
        attn_implementation="eager",  # Force eager attention for output_attentions
        trust_remote_code=True
    )
    
    # Enable attention output
    model.config.output_attentions = True
    
    return model, tokenizer


In [None]:
def detect_hallucination_improved(generated_code, problem, prompt):
    """More nuanced hallucination detection"""
    try:
        # First check basic syntax and structure
        compile(generated_code, '<string>', 'exec')
        
        # Check for common hallucination patterns
        hallucination_indicators = [
            ('TODO', 0.8), ('pass', 0.3), ('...', 0.9), 
            ('raise NotImplementedError', 0.95), ('# Write', 0.6),
            ('return None', 0.4), ('return 0', 0.4), ('placeholder', 0.9)
        ]
        
        hallucination_score = 0
        for pattern, weight in hallucination_indicators:
            if pattern in generated_code:
                hallucination_score += weight
        
        # Execute tests for functional correctness
        exec_result = check_correctness(problem, generated_code, timeout=3.0, completion_id="temp")
        
        # Combined scoring
        if not exec_result["passed"]:
            if hallucination_score > 0.7:
                return True, "high_confidence_hallucination"
            elif "exception" in str(exec_result).lower():
                return True, "execution_error"
            else:
                return False, "minor_functional_error"  # Not hallucination, just wrong
        else:
            if hallucination_score > 0.5:
                return True, "suspicious_patterns"
            else:
                return False, "correct"
                
    except SyntaxError:
        return True, "syntax_error"
    except Exception as e:
        return True, f"other_error: {e}"

In [None]:
def split_problems_by_task(problems, test_ratio=0.3):
    """Split problems by task_id, not by generations"""
    task_ids = list(problems.keys())
    np.random.shuffle(task_ids)
    split_idx = int(len(task_ids) * (1 - test_ratio))
    
    train_problems = {tid: problems[tid] for tid in task_ids[:split_idx]}
    test_problems = {tid: problems[tid] for tid in task_ids[split_idx:]}
    
    return train_problems, test_problems

# Usage
# train_problems, test_problems = split_problems_by_task(problems)
# training_data = collect_training_data(model, tokenizer, train_problems)  # Train on subset
# results, pass_at_1 = evaluate_pass_at_k(model, tokenizer, test_problems, classifier, feature_names)  # Test on unseen

In [None]:
def enhanced_topological_features(dist, prompt_mask, gen_mask):
    """Enhanced topological feature extraction"""
    n = dist.shape[0]
    n_prompt = prompt_mask.sum()
    n_gen = gen_mask.sum()
    
    # Compute persistence with different parameters
    dgms = ripser(dist, distance_matrix=True, maxdim=1)['dgms']
    
    h0_bars = dgms[0][:-1]  # Exclude infinite bar
    h1_bars = dgms[1]
    
    features = {}
    
    # H0 features
    if len(h0_bars) > 0:
        h0_persistences = h0_bars[:, 1] - h0_bars[:, 0]
        features.update({
            'h0_max_persistence': np.max(h0_persistences),
            'h0_mean_persistence': np.mean(h0_persistences),
            'h0_std_persistence': np.std(h0_persistences),
            'h0_num_components': len(h0_bars),
            'h0_persistence_entropy': -np.sum(h0_persistences * np.log(h0_persistences + 1e-10))
        })
    else:
        features.update({f'h0_{k}': 0 for k in ['max_persistence', 'mean_persistence', 'std_persistence', 'persistence_entropy']})
        features['h0_num_components'] = 0
    
    # H1 features  
    if len(h1_bars) > 0:
        h1_persistences = h1_bars[:, 1] - h1_bars[:, 0]
        features.update({
            'h1_total_persistence': np.sum(h1_persistences),
            'h1_max_persistence': np.max(h1_persistences),
            'h1_num_loops': len(h1_bars),
            'h1_persistence_ratio': np.sum(h1_persistences) / (n_prompt + 1e-10)
        })
    else:
        features.update({f'h1_{k}': 0 for k in ['total_persistence', 'max_persistence', 'persistence_ratio']})
        features['h1_num_loops'] = 0
    
    # Normalized features
    features.update({
        'mtd_h0_norm': features['h0_max_persistence'] / (n_gen + 1e-10),
        'mtd_h1_norm': features['h1_total_persistence'] / (n_prompt + 1e-10),
        'component_density': features['h0_num_components'] / (n_gen + 1e-10),
        'loop_density': features['h1_num_loops'] / (n_prompt + 1e-10)
    })
    
    return features

In [None]:
def enhanced_attention_analysis(attention, prompt_len):
    """More sophisticated attention analysis"""
    n = attention.shape[0]
    gen_len = n - prompt_len
    
    # Layer-wise analysis (don't average too early)
    features = {}
    
    # Self-attention patterns
    prompt_self = attention[:prompt_len, :prompt_len]
    gen_self = attention[prompt_len:, prompt_len:]
    
    # Cross-attention patterns
    gen_to_prompt = attention[prompt_len:, :prompt_len]
    prompt_to_gen = attention[:prompt_len, prompt_len:]
    
    features.update({
        # Self-attention metrics
        'prompt_self_attn_mean': torch.mean(torch.diag(prompt_self)).item(),
        'gen_self_attn_mean': torch.mean(torch.diag(gen_self)).item() if gen_len > 0 else 0,
        'prompt_self_attn_std': torch.std(torch.diag(prompt_self)).item(),
        
        # Cross-attention metrics
        'gen_to_prompt_mean': torch.mean(gen_to_prompt).item() if gen_len > 0 else 0,
        'gen_to_prompt_max': torch.max(gen_to_prompt).item() if gen_len > 0 else 0,
        'attention_imbalance': torch.mean(gen_to_prompt).item() - torch.mean(prompt_to_gen).item() if gen_len > 0 else 0,
        
        # Entropy measures
        'attention_entropy': compute_attention_entropy(attention),
        'gen_attention_focus': torch.mean(gen_self).item() / (torch.mean(gen_to_prompt).item() + 1e-10) if gen_len > 0 else 0
    })
    
    return features

def compute_attention_entropy(attention_matrix):
    """Compute entropy of attention distribution"""
    # Flatten and normalize
    flat_attn = attention_matrix.flatten()
    flat_attn = flat_attn / (torch.sum(flat_attn) + 1e-10)
    
    # Compute entropy
    entropy = -torch.sum(flat_attn * torch.log(flat_attn + 1e-10))
    return entropy.item()

In [None]:
def train_regularized_classifier(data):
    """Train with proper regularization and validation"""
    # Prepare features
    feature_keys = ['mtd_h0_norm', 'mtd_h1_norm', 'prompt_self_attn_mean', 
                   'gen_self_attn_mean', 'mean_log_prob', 'gen_to_prompt_mean',
                   'attention_entropy', 'h1_num_loops', 'component_density']
    
    X = []
    y = []
    for item in data:
        features = item["features"]
        X.append([features.get(k, 0) for k in feature_keys])
        y.append(item["label"])
    
    X = np.array(X)
    y = np.array(y)
    
    # Check class balance
    print(f"Class distribution: {np.bincount(y)}")
    
    # Use proper cross-validation
    from sklearn.model_selection import cross_val_score, StratifiedKFold
    
    clf = lgb.LGBMClassifier(
        objective='binary',
        random_state=42,
        n_estimators=100,
        max_depth=6,  # Regularization
        learning_rate=0.1,
        subsample=0.8,
        colsample_bytree=0.8,
        reg_alpha=0.1,
        reg_lambda=0.1,
        verbose=-1
    )
    
    # Cross-validation
    cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
    cv_scores = cross_val_score(clf, X, y, cv=cv, scoring='f1')
    print(f"Cross-val F1 scores: {cv_scores}")
    print(f"Mean CV F1: {np.mean(cv_scores):.4f} (+/- {np.std(cv_scores):.4f})")
    
    # Final training
    clf.fit(X, y)
    
    return clf, feature_keys

In [None]:
def realistic_evaluation(model, tokenizer, test_problems, classifier, feature_names):
    """More realistic evaluation protocol"""
    results = []
    
    for task_id, problem in tqdm(test_problems.items(), desc="Realistic Evaluation"):
        prompt = problem["prompt"]
        
        # Generate multiple candidates (like in real usage)
        candidates = []
        for seed in SEEDS:
            gen_result = generate_with_attention(model, tokenizer, prompt, seed)
            features = extract_attention_features(gen_result["attention_matrix"], gen_result["prompt_len"])
            
            # Add probability features
            safe_probs = [max(p, 1e-10) for p in gen_result["token_probs"]]
            features['mean_log_prob'] = np.mean(np.log(safe_probs))
            
            # Predict hallucination probability
            feature_vector = [features.get(k, 0) for k in feature_names]
            halluc_prob = classifier.predict_proba([feature_vector])[0][1]
            
            candidates.append({
                'code': gen_result["generated_text"],
                'halluc_prob': halluc_prob,
                'features': features,
                'seed': seed
            })
        
        # Strategy 1: Filter out high-hallucination candidates
        safe_candidates = [c for c in candidates if c['halluc_prob'] < 0.5]
        
        if safe_candidates:
            # Pick the most confident safe candidate
            best_candidate = min(safe_candidates, key=lambda x: x['halluc_prob'])
        else:
            # If all seem hallucinated, pick the least bad one
            best_candidate = min(candidates, key=lambda x: x['halluc_prob'])
        
        # Evaluate
        result = check_correctness(problem, best_candidate['code'], timeout=3.0, completion_id=task_id)
        results.append({
            "task_id": task_id,
            "passed": result["passed"],
            "hallucination_prob": best_candidate['halluc_prob'],
            "strategy": "safe" if safe_candidates else "fallback"
        })
    
    pass_rate = sum(r["passed"] for r in results) / len(results)
    print(f"Realistic pass rate: {pass_rate:.4f}")
    return results, pass_rate

In [1]:
#!/usr/bin/env python
import os
import json
import numpy as np
import torch
import joblib
from tqdm import tqdm
from transformers import AutoTokenizer, AutoModelForCausalLM, set_seed
from human_eval.data import read_problems, write_jsonl
from human_eval.execution import check_correctness
import lightgbm as lgb

# def setup_environment():
#     """Initialize model, tokenizer, and benchmark problems"""
#     print(f"Loading model: {MODEL_NAME}")
#     tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
#     tokenizer.pad_token = tokenizer.eos_token
    
#     model = AutoModelForCausalLM.from_pretrained(
#         MODEL_NAME,
#         device_map="cuda:0",
#         torch_dtype=torch.float16,

#         # load_in_8bit=True,  # Add 8-bit quantization
#         output_attentions=True,
#         return_dict_in_generate=True
#     ).eval()
    
#     print(f"Loading benchmark: {BENCHMARK}")
#     if BENCHMARK == "human_eval":
#         problems = read_problems()
#     elif BENCHMARK == "mbpp":
#         # MBPP loading would go here (simplified for this example)
#         raise NotImplementedError("MBPP support requires additional setup")
#     else:
#         raise ValueError(f"Unsupported benchmark: {BENCHMARK}")
    
#     return model, tokenizer, problems

# Configuration - Easy to modify for other models/benchmarks
MODEL_NAME = "codellama/CodeLlama-7b-hf"  # Can switch to other models
BENCHMARK = "human_eval"  # Options: "human_eval" or "mbpp"
TEMPERATURE = 0.8
TOP_P = 0.95
MAX_NEW_TOKENS = 256
NUM_SHOTS = 0  # Zero-shot setting (HumanEval standard)
NUM_SAMPLES = 5  # Generations per problem for classifier training
SEEDS = [0, 1, 2, 3, 4]  # Seeds for reproducibility

# Critical: Set up sandbox for code execution
os.environ["HF_HOME"] = "/tmp"
os.environ["TOKENIZERS_PARALLELISM"] = "True"

def setup_environment():
    """Initialize model, tokenizer, and benchmark problems"""
    print(f"Loading model: {MODEL_NAME}")
    tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
    tokenizer.pad_token = tokenizer.eos_token
    
    # model = AutoModelForCausalLM.from_pretrained(
    #     MODEL_NAME,
    #     device_map="cuda:0",
    #     torch_dtype=torch.float16,

    #     # load_in_8bit=True,  # Add 8-bit quantization
    #     output_attentions=True,
    #     return_dict_in_generate=True
    # ).eval()
    model_name = MODEL_NAME
    model = AutoModelForCausalLM.from_pretrained(
        model_name,
        torch_dtype=torch.float16,
        device_map="auto",
        low_cpu_mem_usage=True,
        attn_implementation="eager",  # Force eager attention for output_attentions
        trust_remote_code=True
    )
    
    # Enable attention output
    model.config.output_attentions = True
    print(f"Loading benchmark: {BENCHMARK}")
    if BENCHMARK == "human_eval":
        problems = read_problems()
    elif BENCHMARK == "mbpp":
        # MBPP loading would go here (simplified for this example)
        raise NotImplementedError("MBPP support requires additional setup")
    else:
        raise ValueError(f"Unsupported benchmark: {BENCHMARK}")
    
    return model, tokenizer, problems
def generate_with_attention(model, tokenizer, prompt, seed):
    """Generate code completion with full attention matrix reconstruction"""
    set_seed(seed)
    
    inputs = tokenizer(prompt, return_tensors="pt").to(model.device)
    prompt_len = inputs["input_ids"].shape[1]
    
    with torch.no_grad():
        outputs = model.generate(
            **inputs,
            max_new_tokens=MAX_NEW_TOKENS,
            do_sample=True,
            temperature=TEMPERATURE,
            top_p=TOP_P,
            num_return_sequences=1,
            output_attentions=True,
            output_scores=True,
            return_dict_in_generate=True
        )
    
    full_seq = outputs.sequences[0]
    generated_ids = full_seq[prompt_len:]
    generated_text = tokenizer.decode(generated_ids, skip_special_tokens=True)
    total_len = len(full_seq)
    
    # Initialize full attention matrix (averaged over layers AND heads)
    full_attention = torch.zeros((total_len, total_len), device=model.device, dtype=torch.float32)
    
    # Step 1: Process initial prompt attentions (step 0)
    initial_attentions = outputs.attentions[0]  # tuple of (layer0, ..., layer31)
    
    # Stack all layers, then average over layers AND heads
    # Each layer: [1, 32, P, P] → after squeeze: [32, P, P]
    stacked_initial = torch.stack([
        layer.squeeze(0) for layer in initial_attentions  # remove batch dim
    ], dim=0)  # → [num_layers, 32, P, P]
    
    # Average over layers (dim=0) AND heads (dim=1) → [P, P]
    avg_initial = stacked_initial.mean(dim=(0, 1))  # Key fix!
    full_attention[:prompt_len, :prompt_len] = avg_initial
    
    # Step 2: Process generation steps (step 1 onward)
    for step, step_attentions in enumerate(outputs.attentions[1:], start=prompt_len):
        # step_attentions: tuple of (layer0, ..., layer31), each [1, 32, 1, step+1]
        stacked_step = torch.stack([
            layer.squeeze(0) for layer in step_attentions  # → [32, 1, step+1]
        ], dim=0)  # → [num_layers, 32, 1, step+1]
        
    # traininingted (G) tokens
    n = full_attention.shape[0]
    prompt_mask = np.zeros(n, dtype=bool)
    prompt_mask[:prompt_len] = True
    gen_mask = ~prompt_mask
    
    return dist, prompt_mask, gen_mask

def compute_topological_features(dist, prompt_mask, gen_mask):
    """Compute MTD-inspired topological features"""
    n_prompt = prompt_mask.sum()
    n_gen = gen_mask.sum()
    # Compute persistent homology explicitly as distance matrix
    dgms = ripser(dist, distance_matrix=True, maxdim=1)['dgms']
    # Extract H0 and H1 features
    h0_bars = dgms[0][:-1]  # Exclude infinite bar
    h1_bars = dgms[1]
    # Compute MTD approximation features
    features = {
        'h0_max_persistence': np.max(h0_bars[:, 1] - h0_bars[:, 0]) if len(h0_bars) > 0 else 0,
        'h1_total_persistence': np.sum(h1_bars[:, 1] - h1_bars[:, 0]) if len(h1_bars) > 0 else 0,
        'num_h1_bars': len(h1_bars),
        'prompt_size': n_prompt,
        'gen_size': n_gen
    }
    # Normalize by component sizes (as specified in requirements)
    if n_gen > 0:
        features['h0_max_persistence_norm'] = features['h0_max_persistence'] / n_gen
    else:
        features['h0_max_persistence_norm'] = 0
        
    if n_prompt > 0:
        features['h1_total_persistence_norm'] = features['h1_total_persistence'] / n_prompt
    else:
        features['h1_total_persistence_norm'] = 0
    return features

def extract_attention_features(attention, prompt_len):
    """Extract all required features from attention matrix"""
    dist, prompt_mask, gen_mask = build_distance_matrix(attention, prompt_len)
    topo_features = compute_topological_features(dist, prompt_mask, gen_mask)
    
    # Extract attention statistics
    prompt_attn = attention[:prompt_len, :prompt_len]
    gen_attn = attention[prompt_len:, prompt_len:]
    
    features = {
        # Topological features (normalized)
        'mtd_h0_norm': topo_features['h0_max_persistence_norm'],
        'mtd_h1_norm': topo_features['h1_total_persistence_norm'],
        
        # Attention statistics
        'prompt_self_attn': torch.diagonal(prompt_attn).mean().item() if prompt_len > 0 else 0,
        'gen_self_attn': torch.diagonal(gen_attn).mean().item() if gen_attn.shape[0] > 0 else 0,
        
        # Basic features
        'prompt_len': prompt_len,
        'gen_len': attention.shape[0] - prompt_len,
        'h1_num_bars': topo_features['num_h1_bars']
    }
    
    return features
def detect_hallucination(generated_code, problem):
    """Execute code to determine hallucination (failure to pass tests)"""
    try:
        # Use the correct signature for HumanEval
        result = check_correctness(
            problem=problem,          # Pass the entire problem dict
            completion=generated_code,
            timeout=3.0,
            completion_id="temp"
        )
        return not result["passed"]  # Hallucination = failed tests
    except Exception as e:
        print(f"Error in execution for problem: {e}")
        return True  # Treat execution errors as hallucinations

def collect_training_data(model, tokenizer, problems):
    """Generate dataset with features and hallucination labels"""
    training_data = []
    for task_id, problem in tqdm(problems.items(), desc="Collecting data"):
        prompt = problem["prompt"]
        for seed in SEEDS[:NUM_SAMPLES]:
            # Generate code with attention
            gen_result = generate_with_attention(model, tokenizer, prompt, seed)
            # Extract features
            attn_features = extract_attention_features(
                gen_result["attention_matrix"],
                gen_result["prompt_len"]
            )
            
            # Add probability features
            # Add probability features with safe log calculation
            safe_probs = [max(p, 1e-10) for p in gen_result["token_probs"]]
            mean_log_prob = np.mean(np.log(safe_probs))
            attn_features.update({
                'mean_log_prob': mean_log_prob,
                'task_id': task_id,
                'seed': seed
            })
            # Determine hallucination label
            is_hallucinated = detect_hallucination(
                gen_result["generated_text"],
                problem
            )
            training_data.append({
                "features": attn_features,
                "label": int(is_hallucinated),  # 1 = hallucinated, 0 = correct
                "code": gen_result["generated_text"]
            })
    
    return training_data
import pandas as pd 
def train_classifier(data):
    """Train XGBoost classifier on collected features"""
    # Prepare data
    X = []
    y = []
    feature_names = None
    for item in data :
        if feature_names is None:
            feature_names = sorted([k for k in item["features"].keys() 
                                  if k not in ["task_id", "seed"]])
        X.append([item["features"][k] for k in feature_names])
        y.append(item["label"])
    X = np.array(X)
    y = np.array(y)
    # Split data (simple holdout)
    split_idx = int(0.8 * len(X))
    X_train, X_test = X[:split_idx], X[split_idx:]
    y_train, y_test = y[:split_idx], y[split_idx:]
    clf = lgb.LGBMClassifier(
        objective='binary',
        random_state=42,
        verbose=-1  # Silences LightGBM output, remove if you want to see training logs
    )
    clf.fit(X_train, y_train)
    # Evaluate
    train_acc = clf.score(X_train, y_train)
    test_acc = clf.score(X_test, y_test)
    
    print(f"Classifier trained. Train accuracy: {train_acc:.4f}, Test accuracy: {test_acc:.4f}")
    return clf, feature_names

def evaluate_pass_at_k(model, tokenizer, problems, classifier, feature_names):
    """Evaluate pass@1 with hallucination filtering"""
    results = []
    total_correct = 0
    # At the start of evaluate_pass_at_k
    sample_problem = next(iter(problems.values()))
    required_keys = ["task_id", "prompt", "test"]
    missing_keys = [k for k in required_keys if k not in sample_problem]
    if missing_keys:
        raise ValueError(f"Problem dictionary missing required keys: {missing_keys}")
    for task_id, problem in tqdm(problems.items(), desc="Evaluating pass@1"):
        prompt = problem["prompt"]
        best_code = None
        best_score = -np.inf
        # Generate multiple candidates
        for seed in SEEDS:
            gen_result = generate_with_attention(model, tokenizer, prompt, seed)
            attn_features = extract_attention_features(
                gen_result["attention_matrix"],
                gen_result["prompt_len"]
            )
            # Add probability features
            probs_array = np.array(gen_result["token_probs"])
            mean_log_prob = np.mean(np.log(probs_array + 1e-10))  # Add epsilon AFTER converting to array
            attn_features['mean_log_prob'] = mean_log_prob
            # Prepare features for classifier
            # When preparing features for classifier
            feature_values = []
            for k in feature_names:
                if k in attn_features:
                    feature_values.append(attn_features[k])
                else:
                    # Use default value for missing features
                    feature_values.append(0.0 if "norm" in k or "prob" in k else 1.0)
            X = pd.DataFrame([feature_values], columns=feature_names)
            halluc_prob = classifier.predict_proba(X)[0][1]
            
            # Score = confidence in non-hallucination + log probability
            score = (1 - halluc_prob) + mean_log_prob
            if score > best_score:
                best_score = score
                best_code = gen_result["generated_text"]
        # Execute best candidate
        result = check_correctness(
            problem=problem,
            completion=best_code,
            timeout=3.0,
            completion_id=task_id
        )
        is_correct = result["passed"]
        total_correct += int(is_correct)
        results.append({
            "task_id": task_id,
            "completion": best_code,
            "passed": is_correct,
            "hallucination_prob": 1 - (best_score - mean_log_prob)  # Approximation
        })
    
    pass_at_1 = total_correct / len(problems)
    print(f"Final pass@1 after hallucination filtering: {pass_at_1:.4f}")
    return results, pass_at_1

def generate_with_attention(model, tokenizer, prompt, seed):
    """Generate code completion with full attention matrix reconstruction"""
    set_seed(seed)
    
    inputs = tokenizer(prompt, return_tensors="pt").to(model.device)
    prompt_len = inputs["input_ids"].shape[1]
    
    with torch.no_grad():
        outputs = model.generate(
            **inputs,
            max_new_tokens=MAX_NEW_TOKENS,
            do_sample=True,
            temperature=TEMPERATURE,
            top_p=TOP_P,
            num_return_sequences=1,
            output_attentions=True,
            output_scores=True,
            return_dict_in_generate=True
        )
    
    full_seq = outputs.sequences[0]
    generated_ids = full_seq[prompt_len:]
    generated_text = tokenizer.decode(generated_ids, skip_special_tokens=True)
    total_len = len(full_seq)
    
    # Initialize full attention matrix (averaged over layers AND heads)
    full_attention = torch.zeros((total_len, total_len), device=model.device, dtype=torch.float32)
    
    # Step 1: Process initial prompt attentions (step 0)
    initial_attentions = outputs.attentions[0]  # tuple of (layer0, ..., layer31)
    
    # Stack all layers, then average over layers AND heads
    # Each layer: [1, 32, P, P] → after squeeze: [32, P, P]
    stacked_initial = torch.stack([
        layer.squeeze(0) for layer in initial_attentions  # remove batch dim
    ], dim=0)  # → [num_layers, 32, P, P]
    
    # Average over layers (dim=0) AND heads (dim=1) → [P, P]
    avg_initial = stacked_initial.mean(dim=(0, 1))  # Key fix!
    full_attention[:prompt_len, :prompt_len] = avg_initial
    
    # Step 2: Process generation steps (step 1 onward)
    for step, step_attentions in enumerate(outputs.attentions[1:], start=prompt_len):
        # step_attentions: tuple of (layer0, ..., layer31), each [1, 32, 1, step+1]
        stacked_step = torch.stack([
            layer.squeeze(0) for layer in step_attentions  # → [32, 1, step+1]
        ], dim=0)  # → [num_layers, 32, 1, step+1]
    n = len(full_attention)
    # traininingted (G) tokens
    prompt_mask = np.zeros(n, dtype=bool)
    prompt_mask[:prompt_len] = True
    gen_mask = ~prompt_mask
    
    return dist, prompt_mask, gen_mask


def analyze_results(results):
    """Analyze the evaluation results"""
    total = len(results)
    passed = sum(r["passed"] for r in results)
    avg_halluc_prob = np.mean([r["hallucination_prob"] for r in results])
    
    strategies = [r.get("strategy", "unknown") for r in results]
    strategy_counts = {s: strategies.count(s) for s in set(strategies)}
    
    print(f"Total test problems: {total}")
    print(f"Problems passed: {passed} ({passed/total*100:.1f}%)")
    print(f"Average hallucination probability: {avg_halluc_prob:.4f}")
    print(f"Strategy usage: {strategy_counts}")

# You'll need to implement the improved functions. Here they are:

def split_problems_by_task(problems, test_ratio=0.3):
    """Split problems by task_id for proper train/test separation"""
    task_ids = list(problems.keys())
    np.random.seed(42)  # For reproducibility
    np.random.shuffle(task_ids)
    split_idx = int(len(task_ids) * (1 - test_ratio))
    
    train_problems = {tid: problems[tid] for tid in task_ids[:split_idx]}
    test_problems = {tid: problems[tid] for tid in task_ids[split_idx:]}
    
    return train_problems, test_problems

def collect_training_data_improved(model, tokenizer, problems):
    """Improved training data collection with better hallucination detection"""
    training_data = []
    
    for task_id, problem in tqdm(problems.items(), desc="Collecting training data"):
        prompt = problem["prompt"]
        
        for seed in SEEDS[:NUM_SAMPLES]:
            try:
                # Generate code with attention
                gen_result = generate_with_attention(model, tokenizer, prompt, seed)
                
                # Extract enhanced features
                attn_features = extract_attention_features_improved(
                    gen_result["attention_matrix"],
                    gen_result["prompt_len"]
                )
                
                # Add probability features with safe log calculation
                safe_probs = [max(p, 1e-10) for p in gen_result["token_probs"]]
                mean_log_prob = np.mean(np.log(safe_probs))
                attn_features['mean_log_prob'] = mean_log_prob
                
                # Determine hallucination label with improved detection
                is_hallucinated, reason = detect_hallucination_improved(
                    gen_result["generated_text"],
                    problem,
                    prompt
                )
                
                training_data.append({
                    "features": attn_features,
                    "label": int(is_hallucinated),
                    "code": gen_result["generated_text"],
                    "task_id": task_id,
                    "seed": seed,
                    "reason": reason
                })
                
            except Exception as e:
                print(f"Error processing {task_id}, seed {seed}: {e}")
                import traceback
                traceback.print_exc()
                # continue
                # break
    
    return training_data

def extract_attention_features_improved(attention, prompt_len):
    """Enhanced feature extraction combining topological and attention features"""
    # Build distance matrix and get topological features
    dist, prompt_mask, gen_mask = build_distance_matrix(attention, prompt_len)
    topo_features = enhanced_topological_features(dist, prompt_mask, gen_mask)
    
    # Get enhanced attention features
    attn_features = enhanced_attention_analysis(attention, prompt_len)
    
    # Combine all features
    combined_features = {}
    combined_features.update(topo_features)
    combined_features.update(attn_features)
    
    # Add basic metadata
    combined_features['prompt_len'] = prompt_len
    combined_features['gen_len'] = attention.shape[0] - prompt_len
    
    return combined_features

def detect_hallucination_improved(generated_code, problem, prompt):
    """More nuanced hallucination detection"""
    try:
        # First check basic syntax and structure
        compile(generated_code, '<string>', 'exec')
        
        # Check for common hallucination patterns
        hallucination_indicators = [
            ('TODO', 0.8), ('pass', 0.3), ('...', 0.9), 
            ('raise NotImplementedError', 0.95), ('# Write', 0.6),
            ('return None', 0.4), ('return 0', 0.4), ('placeholder', 0.9)
        ]
        
        hallucination_score = 0
        for pattern, weight in hallucination_indicators:
            if pattern in generated_code:
                hallucination_score += weight
        
        # Execute tests for functional correctness
        exec_result = check_correctness(problem, generated_code, timeout=3.0, completion_id="temp")
        
        # Combined scoring
        if not exec_result["passed"]:
            if hallucination_score > 0.7:
                return True, "high_confidence_hallucination"
            elif "exception" in str(exec_result).lower():
                return True, "execution_error"
            else:
                return False, "minor_functional_error"
        else:
            if hallucination_score > 0.5:
                return True, "suspicious_patterns"
            else:
                return False, "correct"
                
    except SyntaxError:
        return True, "syntax_error"
    except Exception as e:
        return True, f"other_error: {e}"

def enhanced_topological_features(dist, prompt_mask, gen_mask):
    """Enhanced topological feature extraction"""
    n = dist.shape[0]
    n_prompt = prompt_mask.sum()
    n_gen = gen_mask.sum()
    
    # Compute persistence with different parameters
    dgms = ripser(dist, distance_matrix=True, maxdim=1)['dgms']
    
    h0_bars = dgms[0][:-1]  # Exclude infinite bar
    h1_bars = dgms[1]
    
    features = {}
    
    # H0 features
    if len(h0_bars) > 0:
        h0_persistences = h0_bars[:, 1] - h0_bars[:, 0]
        features.update({
            'h0_max_persistence': float(np.max(h0_persistences)),
            'h0_mean_persistence': float(np.mean(h0_persistences)),
            'h0_std_persistence': float(np.std(h0_persistences)),
            'h0_num_components': len(h0_bars),
        })
    else:
        features.update({f'h0_{k}': 0 for k in ['max_persistence', 'mean_persistence', 'std_persistence']})
        features['h0_num_components'] = 0
    
    # H1 features  
    if len(h1_bars) > 0:
        h1_persistences = h1_bars[:, 1] - h1_bars[:, 0]
        features.update({
            'h1_total_persistence': float(np.sum(h1_persistences)),
            'h1_max_persistence': float(np.max(h1_persistences)),
            'h1_num_loops': len(h1_bars),
        })
    else:
        features.update({f'h1_{k}': 0 for k in ['total_persistence', 'max_persistence']})
        features['h1_num_loops'] = 0
    
    # Normalized features
    features.update({
        'mtd_h0_norm': features['h0_max_persistence'] / (n_gen + 1e-10),
        'mtd_h1_norm': features['h1_total_persistence'] / (n_prompt + 1e-10),
        'component_density': features['h0_num_components'] / (n_gen + 1e-10),
        'loop_density': features['h1_num_loops'] / (n_prompt + 1e-10)
    })
    
    return features

def enhanced_attention_analysis(attention, prompt_len):
    """More sophisticated attention analysis"""
    n = attention.shape[0]
    gen_len = n - prompt_len
    
    # Convert to numpy for easier manipulation if it's a tensor
    if torch.is_tensor(attention):
        attention_np = attention.cpu().numpy()
    else:
        attention_np = attention
    
    features = {}
    
    # Self-attention patterns
    prompt_self = attention_np[:prompt_len, :prompt_len]
    gen_self = attention_np[prompt_len:, prompt_len:] if gen_len > 0 else np.array([])
    
    # Cross-attention patterns
    gen_to_prompt = attention_np[prompt_len:, :prompt_len] if gen_len > 0 else np.array([])
    
    features.update({
        # Self-attention metrics
        'prompt_self_attn_mean': float(np.mean(np.diag(prompt_self))) if prompt_len > 0 else 0,
        'gen_self_attn_mean': float(np.mean(np.diag(gen_self))) if gen_self.size > 0 else 0,
        'prompt_self_attn_std': float(np.std(np.diag(prompt_self))) if prompt_len > 0 else 0,
        
        # Cross-attention metrics
        'gen_to_prompt_mean': float(np.mean(gen_to_prompt)) if gen_to_prompt.size > 0 else 0,
        'gen_to_prompt_max': float(np.max(gen_to_prompt)) if gen_to_prompt.size > 0 else 0,
    })
    
    return features

def train_regularized_classifier(data):
    """Train with proper regularization and validation"""
    # Define the feature keys we want to use
    feature_keys = ['mtd_h0_norm', 'mtd_h1_norm', 'prompt_self_attn_mean', 
                   'gen_self_attn_mean', 'mean_log_prob', 'gen_to_prompt_mean',
                   'h1_num_loops', 'component_density', 'prompt_len', 'gen_len']
    
    X = []
    y = []
    for item in data:
        features = item["features"]
        X.append([features.get(k, 0) for k in feature_keys])
        y.append(item["label"])
    
    X = np.array(X)
    y = np.array(y)
    
    # Check class balance
    print(f"Class distribution: {np.bincount(y)}")
    
    # Use proper cross-validation
    from sklearn.model_selection import cross_val_score, StratifiedKFold
    
    clf = lgb.LGBMClassifier(
        objective='binary',
        random_state=42,
        n_estimators=100,
        max_depth=6,
        learning_rate=0.1,
        subsample=0.8,
        colsample_bytree=0.8,
        reg_alpha=0.1,
        reg_lambda=0.1,
        verbose=-1
    )
    
    # Cross-validation
    cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
    cv_scores = cross_val_score(clf, X, y, cv=cv, scoring='f1')
    print(f"Cross-val F1 scores: {cv_scores}")
    print(f"Mean CV F1: {np.mean(cv_scores):.4f} (+/- {np.std(cv_scores):.4f})")
    
    # Final training on all data
    clf.fit(X, y)
    
    return clf, feature_keys

def realistic_evaluation(model, tokenizer, test_problems, classifier, feature_names):
    """More realistic evaluation protocol"""
    results = []
    
    for task_id, problem in tqdm(test_problems.items(), desc="Realistic Evaluation"):
        prompt = problem["prompt"]
        
        # Generate multiple candidates
        candidates = []
        for seed in SEEDS:
            try:
                gen_result = generate_with_attention(model, tokenizer, prompt, seed)
                features = extract_attention_features_improved(
                    gen_result["attention_matrix"], 
                    gen_result["prompt_len"]
                )
                
                # Add probability features
                safe_probs = [max(p, 1e-10) for p in gen_result["token_probs"]]
                features['mean_log_prob'] = np.mean(np.log(safe_probs))
                
                # Predict hallucination probability
                feature_vector = [features.get(k, 0) for k in feature_names]
                halluc_prob = classifier.predict_proba([feature_vector])[0][1]
                
                candidates.append({
                    'code': gen_result["generated_text"],
                    'halluc_prob': halluc_prob,
                    'features': features,
                    'seed': seed
                })
            except Exception as e:
                print(f"Error generating candidate for {task_id}, seed {seed}: {e}")
                continue
        
        # Strategy: Filter out high-hallucination candidates
        safe_candidates = [c for c in candidates if c['halluc_prob'] < 0.5]
        
        if safe_candidates:
            # Pick the most confident safe candidate
            best_candidate = min(safe_candidates, key=lambda x: x['halluc_prob'])
            strategy = "safe"
        else:
            # If all seem hallucinated, pick the least bad one
            best_candidate = min(candidates, key=lambda x: x['halluc_prob'])
            strategy = "fallback"
        
        # Evaluate
        result = check_correctness(problem, best_candidate['code'], timeout=3.0, completion_id=task_id)
        results.append({
            "task_id": task_id,
            "passed": result["passed"],
            "hallucination_prob": best_candidate['halluc_prob'],
            "strategy": strategy
        })
    
    pass_rate = sum(r["passed"] for r in results) / len(results)
    print(f"Realistic pass rate: {pass_rate:.4f}")
    return results, pass_rate

# Don't forget to add your existing functions:
# - setup_environment()
# - generate_with_attention()
# - build_distance_matrix() 
# - etc.


# def main():
    

ModuleNotFoundError: No module named 'joblib'

In [None]:
# Step 0: Setup environment
print("=== SETTING UP ENVIRONMENT ===")
model, tokenizer, problems = setup_environment()


In [None]:

# Step 1: Split problems for proper evaluation
print("\n=== SPLITTING PROBLEMS ===")
train_problems, test_problems = split_problems_by_task(problems, test_ratio=0.3)
print(f"Training problems: {len(train_problems)}, Test problems: {len(test_problems)}")


In [None]:

# Step 2: Collect training data from TRAINING problems only
print("\n=== COLLECTING TRAINING DATA ===")
training_data = collect_training_data_improved(model, tokenizer, train_problems)

# Save raw data for analysis
with open("training_data.json", "w") as f:
    json.dump([{**item, "features": {k: float(v) if isinstance(v, (np.float32, np.float64)) else v 
                                    for k,v in item["features"].items()}} 
                for item in training_data], f, indent=2)
print(f"Saved {len(training_data)} training samples to training_data.json")


In [None]:

# Step 3: Train regularized classifier
print("\n=== TRAINING HALLUCINATION CLASSIFIER ===")
classifier, feature_names = train_regularized_classifier(training_data)

# Save the trained model
joblib.dump({
    "classifier": classifier,
    "feature_names": feature_names
}, "hallucination_detector.joblib")
print("Saved classifier to hallucination_detector.joblib")


In [None]:

# Step 4: Evaluate on UNSEEN test problems
print("\n=== EVALUATING ON TEST PROBLEMS ===")
results, pass_at_1 = realistic_evaluation(
    model, 
    tokenizer, 
    test_problems, 
    classifier, 
    feature_names
)

# Save results
write_jsonl("samples.jsonl", results)
print(f"Results saved to samples.jsonl. Final pass@1: {pass_at_1:.4f}")

# Step 5: Additional analysis
print("\n=== PERFORMANCE ANALYSIS ===")
analyze_results(results)

In [3]:
!pip install transformers

Collecting transformers
  Downloading transformers-4.57.1-py3-none-any.whl.metadata (43 kB)
Collecting huggingface-hub<1.0,>=0.34.0 (from transformers)
  Downloading huggingface_hub-0.36.0-py3-none-any.whl.metadata (14 kB)
Collecting pyyaml>=5.1 (from transformers)
  Downloading pyyaml-6.0.3-cp310-cp310-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl.metadata (2.4 kB)
Collecting regex!=2019.12.17 (from transformers)
  Downloading regex-2025.11.3-cp310-cp310-manylinux2014_x86_64.manylinux_2_17_x86_64.whl.metadata (40 kB)
Collecting tokenizers<=0.23.0,>=0.22.0 (from transformers)
  Downloading tokenizers-0.22.1-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (6.8 kB)
Collecting safetensors>=0.4.3 (from transformers)
  Downloading safetensors-0.6.2-cp38-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (4.1 kB)
Collecting hf-xet<2.0.0,>=1.1.3 (from huggingface-hub<1.0,>=0.34.0->transformers)
  Using cached hf_xet-1.2.0-cp37-abi3-manylinux_

In [5]:
!pip install datasets==1.8.0

Collecting datasets==1.8.0
  Downloading datasets-1.8.0-py3-none-any.whl.metadata (9.3 kB)
Collecting pyarrow<4.0.0,>=1.0.0 (from datasets==1.8.0)
  Downloading pyarrow-3.0.0.tar.gz (682 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m682.2/682.2 kB[0m [31m14.3 MB/s[0m  [33m0:00:00[0m
[?25h  Installing build dependencies ... [?25lerror
  [1;31merror[0m: [1msubprocess-exited-with-error[0m
  
  [31m×[0m [32minstalling build dependencies for pyarrow[0m did not run successfully.
  [31m│[0m exit code: [1;36m1[0m
  [31m╰─>[0m [31m[1258 lines of output][0m
  [31m   [0m Ignoring numpy: markers 'python_version < "3.9"' don't match your environment
  [31m   [0m Collecting cython>=0.29
  [31m   [0m   Downloading cython-3.2.1-cp310-cp310-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl.metadata (6.7 kB)
  [31m   [0m Collecting numpy==1.19.4
  [31m   [0m   Downloading numpy-1.19.4.zip (7.3 MB)
  [31m   [0m [?25l     [90m━━

In [6]:
!git clone https://huggingface.co/datasets/Muennighoff/mbpp

/bin/bash: git: command not found


In [1]:
!pip install tqdm accelerate 

Collecting accelerate
  Downloading accelerate-1.11.0-py3-none-any.whl.metadata (19 kB)
Downloading accelerate-1.11.0-py3-none-any.whl (375 kB)
Installing collected packages: accelerate
Successfully installed accelerate-1.11.0


In [1]:
"""
MBPP Hallucination Detection for Code LLMs
Compares CodeLlama-7B generated code against MBPP reference solutions
"""

import torch
from transformers import AutoTokenizer, AutoModelForCausalLM
# from datasets import load_dataset
import json
from typing import Dict, List, Tuple
import numpy as np
from tqdm import tqdm
import re
from collections import Counter
import ast

# ==================== Configuration ====================
class Config:
    MODEL_NAME = "codellama/CodeLlama-7b-hf"
    DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
    MAX_NEW_TOKENS = 512
    TEMPERATURE = 0.2
    TOP_P = 0.95
    BATCH_SIZE = 1  # For memory efficiency
    NUM_SAMPLES = 5  # Number of MBPP samples to evaluate
    

# ==================== Model Setup ====================
class CodeLlamaInference:
    def __init__(self, model_name: str, device: str):
        print(f"Loading model: {model_name}")
        self.tokenizer = AutoTokenizer.from_pretrained(model_name)
        self.model = AutoModelForCausalLM.from_pretrained(
            model_name,
            torch_dtype=torch.float16 if device == "cuda" else torch.float32,
            device_map="auto",
            low_cpu_mem_usage=True
        )
        self.device = device
        print(f"Model loaded on {device}")
    
    # def generate_code(self, prompt: str, max_new_tokens: int = 512, 
    #                   temperature: float = 0.2, top_p: float = 0.95) -> str:
    #     """Generate code completion for given prompt"""
    #     inputs = self.tokenizer(prompt, return_tensors="pt").to(self.device)
        
    #     with torch.no_grad():
    #         outputs = self.model.generate(
    #             **inputs,
    #             max_new_tokens=max_new_tokens,
    #             temperature=temperature,
    #             top_p=top_p,
    #             do_sample=True,
    #             pad_token_id=self.tokenizer.eos_token_id
    #         )
        
    #     generated_text = self.tokenizer.decode(outputs[0], skip_special_tokens=True)
    #     # Extract only the generated part (remove prompt)
    #     generated_code = generated_text[len(prompt):].strip()
    #     return generated_code
    def generate_code(self, prompt: str, max_new_tokens: int = 512, 
                      temperature: float = 0.2, top_p: float = 0.95) -> str:
        """Generate code completion for given prompt with better stopping"""
        inputs = self.tokenizer(prompt, return_tensors="pt").to(self.device)
        
        # Add stop tokens to prevent excessive generation
        stop_tokens = ["\n\n", "\n#", "\ndef ", "\nclass "]
        
        with torch.no_grad():
            outputs = self.model.generate(
                **inputs,
                max_new_tokens=max_new_tokens,
                temperature=temperature,
                top_p=top_p,
                do_sample=True,
                pad_token_id=self.tokenizer.eos_token_id,
                eos_token_id=self.tokenizer.eos_token_id,
                # Add repetition penalty to reduce redundant generation
                repetition_penalty=1.1,
            )
        
        generated_text = self.tokenizer.decode(outputs[0], skip_special_tokens=True)
        generated_code = generated_text[len(prompt):].strip()
        
        # Truncate at reasonable endpoints
        for stop_token in stop_tokens:
            if stop_token in generated_code:
                generated_code = generated_code.split(stop_token)[0]
        
        return generated_code

# ==================== MBPP Dataset Loader ====================
# class MBPPDataset:
#     def __init__(self, num_samples: int = 10):
#         print("Loading MBPP dataset...")
#         self.dataset = load_dataset("mbpp", "sanitized", split="test")
#         self.num_samples = min(num_samples, len(self.dataset))
#         print(f"Loaded {len(self.dataset)} samples, using {self.num_samples}")
    
#     def get_prompt(self, example: Dict) -> str:
#         """Format MBPP example into a prompt for code generation"""
#         # MBPP uses 'prompt' not 'text'
#         task_description = example.get('prompt', example.get('text', ''))
        
#         # Extract function name from reference code
#         func_name = 'solution'
#         if 'def ' in example['code']:
#             try:
#                 func_name = example['code'].split('def ')[1].split('(')[0].strip()
#             except:
#                 pass
        
#         prompt = f"""# Task: {task_description}
# # Write a Python function to solve this task
import json
from typing import Dict, List, Any

# Define the expected structure for a dataset sample
DatasetSample = Dict[str, Any]

class MBPPDataset:
    def __init__(self, num_samples: int = 5, data_path: str = "mbpp/data/sanitized-mbpp.json"):
        """
        Initializes the dataset by loading data from a local JSON file.
        
        Args:
            num_samples (int): The maximum number of samples to use.
            data_path (str): The path to the local MBPP JSON file.
        """
        print("Loading MBPP dataset...")
        
        # Replace load_dataset("mbpp", "sanitized", split="test") with local file loading
        self.dataset: List[DatasetSample] = self._load_local_json(data_path)
        
        # Check if loading was successful
        if not self.dataset:
             raise ValueError(f"Failed to load dataset from path: {data_path}. Please check the path and file content.")

        self.num_samples = min(num_samples, len(self.dataset))
        print(f"Loaded {len(self.dataset)} total samples, using {self.num_samples}")

    def _load_local_json(self, path: str) -> List[DatasetSample]:
        """Utility function to load a list of dictionaries from a JSON file."""
        try:
            with open(path, 'r', encoding='utf-8') as f:
                # The 'sanitized-mbpp.json' structure is a single JSON array of objects
                data = json.load(f)
                return data
        except FileNotFoundError:
            print(f"Error: The file was not found at {path}. Have you run 'git clone' and checked the directory structure?")
            return []
        except json.JSONDecodeError:
            # Handle potential issue with 'mbpp.json' which is a newline-delimited JSON (JSONL)
            # If you were loading 'mbpp.json', you'd need to change the loading logic to read line by line.
            print(f"Error: Could not decode JSON from {path}. Ensure it is a valid JSON array.")
            return []
        except Exception as e:
            print(f"An unexpected error occurred during file loading: {e}")
            return []

    def get_prompt(self, example: DatasetSample) -> str:
        """Format MBPP example into a prompt for code generation"""
        # MBPP uses 'prompt' not 'text' in the sanitized dataset
        task_description = example.get('prompt', example.get('text', ''))
        
        # Extract function name from reference code
        func_name = 'solution'
        if 'def ' in example['code']:
            try:
                # Basic parsing to find the function name
                func_name = example['code'].split('def ')[1].split('(')[0].strip()
            except IndexError:
                # Fallback if split fails unexpectedly
                pass
        
        prompt = f"""# Task: {task_description}
# Write a Python function to solve this task
def {func_name}("""
        return prompt
    
    def get_reference_code(self, example: Dict) -> str:
        """Get the reference solution from MBPP"""
        return example['code']
    
    def get_test_cases(self, example: Dict) -> List[str]:
        """Get test assertions from MBPP"""
        return example['test_list']
    
    def iterate_samples(self):
        """Iterate through dataset samples"""
        for i in range(self.num_samples):
            yield self.dataset[i]


# ==================== Code Analysis ====================
class CodeAnalyzer:
    @staticmethod
    def extract_function_signature(code: str) -> str:
        """Extract function signature from code"""
        try:
            tree = ast.parse(code)
            for node in ast.walk(tree):
                if isinstance(node, ast.FunctionDef):
                    args = [arg.arg for arg in node.args.args]
                    return f"{node.name}({', '.join(args)})"
        except:
            pass
        
        # Fallback: regex extraction
        match = re.search(r'def\s+(\w+)\s*\((.*?)\)', code)
        if match:
            return f"{match.group(1)}({match.group(2)})"
        return ""
    
    @staticmethod
    def is_valid_python(code: str) -> bool:
        """Check if code is syntactically valid Python"""
        try:
            ast.parse(code)
            return True
        except:
            return False
    
    @staticmethod
    def extract_imports(code: str) -> set:
        """Extract imported modules"""
        imports = set()
        try:
            tree = ast.parse(code)
            for node in ast.walk(tree):
                if isinstance(node, ast.Import):
                    for alias in node.names:
                        imports.add(alias.name)
                elif isinstance(node, ast.ImportFrom):
                    imports.add(node.module)
        except:
            pass
        return imports
    
    @staticmethod
    def count_functions(code: str) -> int:
        """Count number of functions defined"""
        try:
            tree = ast.parse(code)
            return sum(1 for node in ast.walk(tree) if isinstance(node, ast.FunctionDef))
        except:
            return 0


# ==================== Hallucination Detection ====================
class HallucinationDetector:
    def __init__(self):
        self.analyzer = CodeAnalyzer()
    
    # def detect_hallucination(self, prompt: str, generated_code: str, 
    #                         reference_code: str, test_cases: List[str]) -> Dict:
    #     """
    #     Detect if generated code has hallucinations
    #     Returns binary classification and detailed features
    #     """
    #     features = {}
        
    #     # Feature 1: Syntax validity
    #     features['generated_valid_syntax'] = self.analyzer.is_valid_python(generated_code)
    #     features['reference_valid_syntax'] = self.analyzer.is_valid_python(reference_code)
        
    #     # Feature 2: Function signature match
    #     gen_sig = self.analyzer.extract_function_signature(generated_code)
    #     ref_sig = self.analyzer.extract_function_signature(reference_code)
    #     features['signature_match'] = (gen_sig == ref_sig) if gen_sig and ref_sig else False
    #     features['generated_signature'] = gen_sig
    #     features['reference_signature'] = ref_sig
        
    #     # Feature 3: Import comparison
    #     gen_imports = self.analyzer.extract_imports(generated_code)
    #     ref_imports = self.analyzer.extract_imports(reference_code)
    #     features['imports_overlap'] = len(gen_imports & ref_imports) / max(len(gen_imports | ref_imports), 1)
        
    #     # Feature 4: Code length ratio
    #     gen_length = len(generated_code.strip())
    #     ref_length = len(reference_code.strip())
    #     features['length_ratio'] = gen_length / max(ref_length, 1)
        
    #     # Feature 5: Test execution (simplified - checks if test structure matches)
    #     features['num_test_cases'] = len(test_cases)
        
    #     # Feature 6: Functional test execution
    #     features['tests_passed'] = self._execute_tests(generated_code, test_cases)
        
    #     # Binary Classification: Has Hallucination?
    #     # Hallucination if:
    #     # - Invalid syntax OR
    #     # - Signature doesn't match OR
    #     # - Failed tests OR
    #     # - Extremely short/long code (length ratio < 0.2 or > 5.0)
    #     has_hallucination = (
    #         not features['generated_valid_syntax'] or
    #         not features['signature_match'] or
    #         features['tests_passed'] == 0 or
    #         features['length_ratio'] < 0.2 or
    #         features['length_ratio'] > 5.0
    #     )
        
    #     features['has_hallucination'] = has_hallucination
    #     features['hallucination_binary'] = 1 if has_hallucination else 0
        
    #     return features
    def detect_hallucination(self, prompt: str, generated_code: str, 
                            reference_code: str, test_cases: List[str]) -> Dict:
        
        # Complete the function if it's cut off
        complete_generated = self._complete_function(prompt, generated_code)
        features = {}
        # Use complete function for analysis
        features['generated_valid_syntax'] = self.analyzer.is_valid_python(complete_generated)
        features['reference_valid_syntax'] = self.analyzer.is_valid_python(reference_code)
        
        # Extract signatures from complete code
        gen_sig = self.analyzer.extract_function_signature(complete_generated)
        ref_sig = self.analyzer.extract_function_signature(reference_code)
        features['signature_match'] = (gen_sig == ref_sig) if gen_sig and ref_sig else False
        
        # Test the complete function
        features['tests_passed'] = self._execute_tests(complete_generated, test_cases)
        
        # Calculate length ratio
        gen_length = len(complete_generated.strip())
        ref_length = len(reference_code.strip())
        features['length_ratio'] = gen_length / max(ref_length, 1)

        
        # Feature 5: Test execution (simplified - checks if test structure matches)
        features['num_test_cases'] = len(test_cases)
        
        # Feature 6: Functional test execution
        features['tests_passed'] = self._execute_tests(generated_code, test_cases)
        
        # Binary Classification: Has Hallucination?
        # Hallucination if:
        # - Invalid syntax OR
        # - Signature doesn't match OR
        # - Failed tests OR
        # - Extremely short/long code (length ratio < 0.2 or > 5.0)
        has_hallucination = (
            not features['generated_valid_syntax'] or
            not features['signature_match'] or
            features['tests_passed'] == 0 or
            features['length_ratio'] < 0.2 or
            features['length_ratio'] > 5.0
        )
        
        features['has_hallucination'] = has_hallucination
        features['hallucination_binary'] = 1 if has_hallucination else 0
        
        return features
    
    def _complete_function(self, prompt: str, generated_code: str) -> str:
        """Ensure generated code forms a complete function"""
        full_code = prompt + generated_code
        
        # Check if we have a complete function (has colon and indented body)
        if '):' in generated_code and '\n    ' in generated_code:
            # Try to extract just the function
            lines = full_code.split('\n')
            function_lines = []
            in_function = False
            indent_level = None
            
            for line in lines:
                if line.strip().startswith('def '):
                    in_function = True
                    indent_level = len(line) - len(line.lstrip())
                    function_lines.append(line)
                elif in_function:
                    current_indent = len(line) - len(line.lstrip())
                    if line.strip() == '':
                        function_lines.append(line)
                    elif current_indent > indent_level:
                        function_lines.append(line)
                    else:
                        break
            
            return '\n'.join(function_lines)
        
        return full_code
    
    def _execute_tests(self, code: str, test_cases: List[str]) -> int:
        """
        Execute test cases on generated code
        Returns number of passed tests
        """
        if not self.analyzer.is_valid_python(code):
            return 0
        
        passed = 0
        namespace = {}
        
        try:
            # Execute the generated code
            exec(code, namespace)
            
            # Run each test case
            for test in test_cases:
                try:
                    exec(test, namespace)
                    passed += 1
                except:
                    continue
        except:
            pass
        
        return passed

# ===================Get Prompt =======================
def get_prompt(self, example: Dict) -> str:
    task_description = example.get('prompt', example.get('text', ''))
    
    func_name = 'solution'
    if 'def ' in example['code']:
        try:
            func_name = example['code'].split('def ')[1].split('(')[0].strip()
        except:
            pass
    
    prompt = f"""# Task: {task_description}
# Write a complete Python function to solve this task.
# Requirements:
# 1. Write ONLY the function code, no additional comments or test cases
# 2. Make sure the function is syntactically correct
# 3. Use appropriate parameter names
# 4. Return the result, don't print it
# Example 1: Write a function to calculate area of rectangle
# def area_rectangle(length, width):
#     return length * width

# Example 2: Write a function to find factorial of a number  
# def factorial(n):
#     if n == 0:
#         return 1
#     else:
#         return n * factorial(n-1)

# Now write the requested function:
def {func_name}("""
    return prompt
# ==================== Evaluation Pipeline ====================
class HallucinationEvaluator:
    def __init__(self, config: Config):
        self.config = config
        self.model = CodeLlamaInference(config.MODEL_NAME, config.DEVICE)
        self.dataset = MBPPDataset(config.NUM_SAMPLES)
        self.detector = HallucinationDetector()
        self.results = []
    
    def run_evaluation(self):
        """Run full evaluation pipeline"""
        print(f"\n{'='*60}")
        print("Starting Hallucination Detection Evaluation")
        print(f"{'='*60}\n")
        
        for idx, example in enumerate(tqdm(self.dataset.iterate_samples(), 
                                           total=self.config.NUM_SAMPLES,
                                           desc="Evaluating")):
            # Get prompt and reference
            prompt = self.dataset.get_prompt(example)
            reference_code = self.dataset.get_reference_code(example)
            test_cases = self.dataset.get_test_cases(example)
            
            # Generate code with LLM
            generated_code = self.model.generate_code(
                prompt,
                max_new_tokens=self.config.MAX_NEW_TOKENS,
                temperature=self.config.TEMPERATURE,
                top_p=self.config.TOP_P
            )
            
            # Detect hallucination
            detection_result = self.detector.detect_hallucination(
                prompt, generated_code, reference_code, test_cases
            )
            
            # Store results
            result = {
                'example_id': idx,
                'task_description': example.get('prompt', example.get('text', '')),
                'prompt': prompt,
                'generated_code': generated_code,
                'reference_code': reference_code,
                'test_cases': test_cases,
                **detection_result
            }
            self.results.append(result)
        
        self._print_summary()
        return self.results
    
    def _print_summary(self):
        """Print evaluation summary statistics"""
        print(f"\n{'='*60}")
        print("Evaluation Summary")
        print(f"{'='*60}\n")
        
        total = len(self.results)
        hallucinated = sum(1 for r in self.results if r['has_hallucination'])
        
        print(f"Total Samples: {total}")
        print(f"Hallucinated: {hallucinated} ({hallucinated/total*100:.2f}%)")
        print(f"Non-Hallucinated: {total - hallucinated} ({(total-hallucinated)/total*100:.2f}%)")
        print()
        
        # Breakdown by feature
        syntax_errors = sum(1 for r in self.results if not r['generated_valid_syntax'])
        sig_mismatches = sum(1 for r in self.results if not r['signature_match'])
        test_failures = sum(1 for r in self.results if r['tests_passed'] == 0)
        
        print(f"Syntax Errors: {syntax_errors} ({syntax_errors/total*100:.2f}%)")
        print(f"Signature Mismatches: {sig_mismatches} ({sig_mismatches/total*100:.2f}%)")
        print(f"Test Failures: {test_failures} ({test_failures/total*100:.2f}%)")
        print()
        
        avg_tests_passed = np.mean([r['tests_passed'] for r in self.results])
        avg_length_ratio = np.mean([r['length_ratio'] for r in self.results])
        
        print(f"Average Tests Passed: {avg_tests_passed:.2f}")
        print(f"Average Length Ratio: {avg_length_ratio:.2f}")
    
    def save_results(self, output_path: str = "hallucination_results.json"):
        """Save results to JSON file"""
        with open(output_path, 'w') as f:
            json.dump(self.results, f, indent=2)
        print(f"\nResults saved to {output_path}")
    
    def get_classification_data(self) -> Tuple[List[str], List[int]]:
        """
        Get data for binary classification
        Returns: (codes, labels) where labels are 0 (no hallucination) or 1 (hallucination)
        """
        codes = [r['generated_code'] for r in self.results]
        labels = [r['hallucination_binary'] for r in self.results]
        return codes, labels


# ==================== Main Execution ====================
def main():
    # Initialize configuration
    config = Config()
    
    print(f"Device: {config.DEVICE}")
    print(f"Model: {config.MODEL_NAME}")
    print(f"Samples: {config.NUM_SAMPLES}")
    
    # Run evaluation
    evaluator = HallucinationEvaluator(config)
    results = evaluator.run_evaluation()
    
    # Save results
    evaluator.save_results("hallucination_results.json")
    
    # Get binary classification data
    codes, labels = evaluator.get_classification_data()
    print(f"\nBinary Classification Data:")
    print(f"Total samples: {len(codes)}")
    print(f"Positive (Hallucination): {sum(labels)}")
    print(f"Negative (No Hallucination): {len(labels) - sum(labels)}")
    
    # Example: Show first few results
    print(f"\n{'='*60}")
    print("Sample Results (First 3)")
    print(f"{'='*60}\n")
    
    for i in range(min(3, len(results))):
        result = results[i]
        print(f"Example {i+1}:")
        print(f"Task: {result['task_description'][:80]}...")
        print(f"Hallucination: {result['has_hallucination']}")
        print(f"Valid Syntax: {result['generated_valid_syntax']}")
        print(f"Signature Match: {result['signature_match']}")
        print(f"Tests Passed: {result['tests_passed']}/{result['num_test_cases']}")
        print(f"Length Ratio: {result['length_ratio']:.2f}")
        print("-" * 60)


# if __name__ == "__main__":
#     main()

  from .autonotebook import tqdm as notebook_tqdm


In [1]:
"""
MBPP Hallucination Detection for Code LLMs
Compares CodeLlama-7B generated code against MBPP reference solutions
"""

import torch
from transformers import AutoTokenizer, AutoModelForCausalLM
import json
from typing import Dict, List, Any, Tuple
import numpy as np
from tqdm import tqdm
import re
from collections import Counter
import ast
import sys
import io
import contextlib

# ==================== Configuration ====================
class Config:
    MODEL_NAME = "codellama/CodeLlama-7b-hf"
    DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
    MAX_NEW_TOKENS = 512
    TEMPERATURE = 0.2
    TOP_P = 0.95
    BATCH_SIZE = 1
    NUM_SAMPLES = 5

# ==================== Model Setup ====================
class CodeLlamaInference:
    def __init__(self, model_name: str, device: str):
        print(f"Loading model: {model_name}")
        self.tokenizer = AutoTokenizer.from_pretrained(model_name)
        self.tokenizer.pad_token = self.tokenizer.eos_token
        self.model = AutoModelForCausalLM.from_pretrained(
            model_name,
            torch_dtype=torch.float16 if device == "cuda" else torch.float32,
            device_map="auto",
            low_cpu_mem_usage=True
        )
        self.device = device
        print(f"Model loaded on {device}")
    
    def generate_code(self, prompt: str, max_new_tokens: int = 512, 
                      temperature: float = 0.2, top_p: float = 0.95) -> str:
        """Generate code completion for given prompt with better stopping"""
        inputs = self.tokenizer(prompt, return_tensors="pt").to(self.device)
        
        with torch.no_grad():
            outputs = self.model.generate(
                **inputs,
                max_new_tokens=max_new_tokens,
                temperature=temperature,
                top_p=top_p,
                do_sample=True,
                pad_token_id=self.tokenizer.eos_token_id,
                eos_token_id=self.tokenizer.eos_token_id,
                repetition_penalty=1.1,
            )
        
        generated_text = self.tokenizer.decode(outputs[0], skip_special_tokens=True)
        generated_code = generated_text[len(prompt):].strip()
        
        # Enhanced function completion
        generated_code = self._complete_function(generated_code, prompt)
        
        return generated_code
    
    def _complete_function(self, generated_code: str, prompt: str) -> str:
        """Ensure the generated code forms a complete function"""
        lines = generated_code.split('\n')
        completed_lines = []
        in_function = False
        expected_indent = None
        
        for i, line in enumerate(lines):
            stripped = line.strip()
            
            # Look for function definition
            if stripped.startswith('def '):
                in_function = True
                completed_lines.append(line)
                continue
            
            if in_function:
                if not stripped:  # Empty line
                    completed_lines.append(line)
                    continue
                    
                # Check if this line is part of function body (has proper indentation)
                current_indent = len(line) - len(line.lstrip())
                if expected_indent is None and current_indent > 0:
                    expected_indent = current_indent
                
                if expected_indent is not None and current_indent >= expected_indent:
                    completed_lines.append(line)
                else:
                    # We've reached the end of the function
                    break
            else:
                completed_lines.append(line)
        
        result = '\n'.join(completed_lines)
        
        # If we don't have a complete function, try to add a basic return
        if in_function and 'return' not in result:
            # Add a simple return statement with proper indentation
            if expected_indent is None:
                expected_indent = 4
            return_stmt = ' ' * expected_indent + 'return None'
            result += '\n' + return_stmt
        
        return result

# ==================== MBPP Dataset Loader ====================
class MBPPDataset:
    def __init__(self, num_samples: int = 5):
        """
        Simulated MBPP dataset since we can't load the real one
        """
        print("Creating simulated MBPP dataset...")
        self.dataset = self._create_sample_data()
        self.num_samples = min(num_samples, len(self.dataset))
        print(f"Created {len(self.dataset)} samples, using {self.num_samples}")
    
    def _create_sample_data(self) -> List[Dict[str, Any]]:
        """Create sample MBPP-like data for testing"""
        samples = [
            {
                'prompt': 'Write a function to find the n largest integers from a given list of numbers, returned in descending order.',
                'code': """import heapq as hq
def heap_queue_largest(nums, n):
    largest_nums = hq.nlargest(n, nums)
    return largest_nums""",
                'test_list': [
                    'assert heap_queue_largest([25, 35, 22, 85, 14, 65, 75, 22, 58], 3) == [85, 75, 65]',
                    'assert heap_queue_largest([25, 35, 22, 85, 14, 65, 75, 22, 58], 2) == [85, 75]',
                    'assert heap_queue_largest([25, 35, 22, 85, 14, 65, 75, 22, 58], 5) == [85, 75, 65, 58, 35]'
                ]
            },
            {
                'prompt': 'Write a function to check if a string is a palindrome.',
                'code': """def is_palindrome(s):
    s = s.lower().replace(" ", "")
    return s == s[::-1]""",
                'test_list': [
                    'assert is_palindrome("A man a plan a canal Panama") == True',
                    'assert is_palindrome("racecar") == True', 
                    'assert is_palindrome("hello") == False'
                ]
            },
            {
                'prompt': 'Write a function to calculate the factorial of a number.',
                'code': """def factorial(n):
    if n == 0:
        return 1
    else:
        return n * factorial(n-1)""",
                'test_list': [
                    'assert factorial(5) == 120',
                    'assert factorial(0) == 1',
                    'assert factorial(1) == 1'
                ]
            }
        ]
        return samples
    
    def get_prompt(self, example: Dict) -> str:
        """Format MBPP example into a prompt for code generation"""
        task_description = example.get('prompt', '')
        
        # Extract function name from reference code
        func_name = 'solution'
        if 'def ' in example['code']:
            try:
                func_name = example['code'].split('def ')[1].split('(')[0].strip()
            except:
                pass
        
        # Enhanced prompt with better instructions
        prompt = f"""# Task: {task_description}
# Write a complete Python function to solve this task.
# Requirements:
# 1. Write ONLY the function code
# 2. Make sure the function is complete and syntactically correct
# 3. Include proper parameter names based on the task
# 4. Return the result, don't print it
# 5. Ensure the function has proper indentation

def {func_name}("""
        return prompt
    
    def get_reference_code(self, example: Dict) -> str:
        """Get the reference solution from MBPP"""
        return example['code']
    
    def get_test_cases(self, example: Dict) -> List[str]:
        """Get test assertions from MBPP"""
        return example['test_list']
    
    def iterate_samples(self):
        """Iterate through dataset samples"""
        for i in range(self.num_samples):
            yield self.dataset[i]

# ==================== Code Analysis ====================
class CodeAnalyzer:
    @staticmethod
    def extract_function_signature(code: str) -> str:
        """Extract function signature from code"""
        try:
            tree = ast.parse(code)
            for node in ast.walk(tree):
                if isinstance(node, ast.FunctionDef):
                    args = [arg.arg for arg in node.args.args]
                    return f"{node.name}({', '.join(args)})"
        except:
            pass
        
        # Fallback: regex extraction
        match = re.search(r'def\s+(\w+)\s*\((.*?)\):', code)
        if match:
            return f"{match.group(1)}({match.group(2)})"
        return ""
    
    @staticmethod
    def is_valid_python(code: str) -> bool:
        """Check if code is syntactically valid Python"""
        try:
            ast.parse(code)
            return True
        except:
            return False
    
    @staticmethod
    def extract_imports(code: str) -> set:
        """Extract imported modules"""
        imports = set()
        try:
            tree = ast.parse(code)
            for node in ast.walk(tree):
                if isinstance(node, ast.Import):
                    for alias in node.names:
                        imports.add(alias.name)
                elif isinstance(node, ast.ImportFrom):
                    if node.module:
                        imports.add(node.module)
        except:
            pass
        return imports
    
    @staticmethod
    def extract_function_body(code: str) -> str:
        """Extract just the function body for analysis"""
        try:
            tree = ast.parse(code)
            for node in ast.walk(tree):
                if isinstance(node, ast.FunctionDef):
                    # Return the function source
                    return ast.get_source_segment(code, node)
        except:
            pass
        return code

# ==================== Hallucination Detection ====================
class HallucinationDetector:
    def __init__(self):
        self.analyzer = CodeAnalyzer()
    
    def detect_hallucination(self, prompt: str, generated_code: str, 
                            reference_code: str, test_cases: List[str]) -> Dict:
        """
        Detect if generated code has hallucinations
        """
        # First, ensure we have a complete function
        complete_generated = self._ensure_complete_function(prompt, generated_code)
        
        features = {}
        
        # Feature 1: Syntax validity
        features['generated_valid_syntax'] = self.analyzer.is_valid_python(complete_generated)
        features['reference_valid_syntax'] = self.analyzer.is_valid_python(reference_code)
        
        # Feature 2: Function signature match
        gen_sig = self.analyzer.extract_function_signature(complete_generated)
        ref_sig = self.analyzer.extract_function_signature(reference_code)
        features['signature_match'] = self._compare_signatures(gen_sig, ref_sig)
        features['generated_signature'] = gen_sig
        features['reference_signature'] = ref_sig
        
        # Feature 3: Import comparison
        gen_imports = self.analyzer.extract_imports(complete_generated)
        ref_imports = self.analyzer.extract_imports(reference_code)
        union_imports = gen_imports | ref_imports
        features['imports_overlap'] = len(gen_imports & ref_imports) / max(len(union_imports), 1) if union_imports else 0.0
        
        # Feature 4: Code length ratio
        gen_length = len(complete_generated.strip())
        ref_length = len(reference_code.strip())
        features['length_ratio'] = gen_length / max(ref_length, 1)
        
        # Feature 5: Test execution
        features['num_test_cases'] = len(test_cases)
        features['tests_passed'] = self._execute_tests(complete_generated, test_cases)
        
        # Binary Classification: Has Hallucination?
        has_hallucination = (
            not features['generated_valid_syntax'] or
            not features['signature_match'] or
            features['tests_passed'] == 0 or
            features['length_ratio'] < 0.2 or
            features['length_ratio'] > 5.0
        )
        
        features['has_hallucination'] = has_hallucination
        features['hallucination_binary'] = 1 if has_hallucination else 0
        
        return features
    
    def _ensure_complete_function(self, prompt: str, generated_code: str) -> str:
        """Ensure the generated code forms a complete, callable function"""
        full_code = prompt + generated_code
        
        # Try to extract a complete function
        try:
            tree = ast.parse(full_code)
            functions = [node for node in tree.body if isinstance(node, ast.FunctionDef)]
            if functions:
                # We found at least one function, return the source
                func = functions[0]
                return ast.get_source_segment(full_code, func)
        except:
            pass
        
        # Fallback: try to complete the function manually
        lines = full_code.split('\n')
        function_lines = []
        in_function = False
        base_indent = None
        
        for line in lines:
            if line.strip().startswith('def '):
                if in_function:
                    break  # We already have a function, don't start another
                in_function = True
                function_lines.append(line)
                # Set base indent
                base_indent = len(line) - len(line.lstrip())
            elif in_function:
                current_indent = len(line) - len(line.lstrip())
                if line.strip() == '':
                    function_lines.append(line)
                elif current_indent > base_indent:
                    function_lines.append(line)
                else:
                    break  # End of function
        
        result = '\n'.join(function_lines)
        
        # If we don't have a return statement, add one
        if in_function and 'return' not in result:
            if base_indent is not None:
                return_line = ' ' * (base_indent + 4) + 'return None'
                result += '\n' + return_line
        
        return result if in_function else full_code
    
    def _compare_signatures(self, gen_sig: str, ref_sig: str) -> bool:
        """Compare function signatures with some flexibility"""
        if not gen_sig or not ref_sig:
            return False
        
        # Extract function names and parameters
        gen_match = re.match(r'(\w+)\((.*)\)', gen_sig)
        ref_match = re.match(r'(\w+)\((.*)\)', ref_sig)
        
        if not gen_match or not ref_match:
            return False
        
        gen_name, gen_params = gen_match.groups()
        ref_name, ref_params = ref_match.groups()
        
        # Compare number of parameters (more flexible)
        gen_param_count = len([p for p in gen_params.split(',') if p.strip()])
        ref_param_count = len([p for p in ref_params.split(',') if p.strip()])
        
        return gen_param_count == ref_param_count
    
    def _execute_tests(self, code: str, test_cases: List[str]) -> int:
        """Execute test cases on generated code safely"""
        if not self.analyzer.is_valid_python(code):
            return 0
        
        passed = 0
        namespace = {}
        
        try:
            # Execute the generated code in a safe environment
            exec(code, namespace)
            
            for test in test_cases:
                try:
                    # Create a copy of namespace for each test
                    test_namespace = namespace.copy()
                    
                    # Redirect stdout to avoid print statements
                    with contextlib.redirect_stdout(io.StringIO()):
                        with contextlib.redirect_stderr(io.StringIO()):
                            exec(test, test_namespace)
                    passed += 1
                except Exception as e:
                    # Test failed
                    continue
        except Exception as e:
            # Code execution failed
            return 0
        
        return passed

# ==================== Evaluation Pipeline ====================
class HallucinationEvaluator:
    def __init__(self, config: Config):
        self.config = config
        self.model = CodeLlamaInference(config.MODEL_NAME, config.DEVICE)
        self.dataset = MBPPDataset(config.NUM_SAMPLES)
        self.detector = HallucinationDetector()
        self.results = []
    
    def run_evaluation(self):
        """Run full evaluation pipeline"""
        print(f"\n{'='*60}")
        print("Starting Hallucination Detection Evaluation")
        print(f"{'='*60}\n")
        
        for idx, example in enumerate(tqdm(self.dataset.iterate_samples(), 
                                           total=self.config.NUM_SAMPLES,
                                           desc="Evaluating")):
            try:
                # Get prompt and reference
                prompt = self.dataset.get_prompt(example)
                reference_code = self.dataset.get_reference_code(example)
                test_cases = self.dataset.get_test_cases(example)
                
                # Generate code with LLM
                generated_code = self.model.generate_code(
                    prompt,
                    max_new_tokens=self.config.MAX_NEW_TOKENS,
                    temperature=self.config.TEMPERATURE,
                    top_p=self.config.TOP_P
                )
                
                # Detect hallucination
                detection_result = self.detector.detect_hallucination(
                    prompt, generated_code, reference_code, test_cases
                )
                
                # Store results
                result = {
                    'example_id': idx,
                    'task_description': example.get('prompt', ''),
                    'prompt': prompt,
                    'generated_code': generated_code,
                    'reference_code': reference_code,
                    'test_cases': test_cases,
                    **detection_result
                }
                self.results.append(result)
                
            except Exception as e:
                print(f"Error processing example {idx}: {e}")
                continue
        
        self._print_summary()
        return self.results
    
    def _print_summary(self):
        """Print evaluation summary statistics"""
        print(f"\n{'='*60}")
        print("Evaluation Summary")
        print(f"{'='*60}\n")
        
        if not self.results:
            print("No results to summarize")
            return
            
        total = len(self.results)
        hallucinated = sum(1 for r in self.results if r['has_hallucination'])
        
        print(f"Total Samples: {total}")
        print(f"Hallucinated: {hallucinated} ({hallucinated/total*100:.2f}%)")
        print(f"Non-Hallucinated: {total - hallucinated} ({(total-hallucinated)/total*100:.2f}%)")
        print()
        
        # Breakdown by feature
        syntax_errors = sum(1 for r in self.results if not r['generated_valid_syntax'])
        sig_mismatches = sum(1 for r in self.results if not r['signature_match'])
        test_failures = sum(1 for r in self.results if r['tests_passed'] == 0)
        
        print(f"Syntax Errors: {syntax_errors} ({syntax_errors/total*100:.2f}%)")
        print(f"Signature Mismatches: {sig_mismatches} ({sig_mismatches/total*100:.2f}%)")
        print(f"Test Failures: {test_failures} ({test_failures/total*100:.2f}%)")
        print()
        
        avg_tests_passed = np.mean([r['tests_passed'] for r in self.results])
        avg_length_ratio = np.mean([r['length_ratio'] for r in self.results])
        
        print(f"Average Tests Passed: {avg_tests_passed:.2f}")
        print(f"Average Length Ratio: {avg_length_ratio:.2f}")
    
    def save_results(self, output_path: str = "hallucination_results.json"):
        """Save results to JSON file"""
        with open(output_path, 'w') as f:
            json.dump(self.results, f, indent=2)
        print(f"\nResults saved to {output_path}")
    
    def get_classification_data(self) -> Tuple[List[str], List[int]]:
        """
        Get data for binary classification
        Returns: (codes, labels) where labels are 0 (no hallucination) or 1 (hallucination)
        """
        codes = [r['generated_code'] for r in self.results]
        labels = [r['hallucination_binary'] for r in self.results]
        return codes, labels

# ==================== Main Execution ====================
def main():
    # Initialize configuration
    config = Config()
    
    print(f"Device: {config.DEVICE}")
    print(f"Model: {config.MODEL_NAME}")
    print(f"Samples: {config.NUM_SAMPLES}")
    
    # Run evaluation
    evaluator = HallucinationEvaluator(config)
    results = evaluator.run_evaluation()
    
    # Save results
    evaluator.save_results("hallucination_results.json")
    
    if results:
        # Get binary classification data
        codes, labels = evaluator.get_classification_data()
        print(f"\nBinary Classification Data:")
        print(f"Total samples: {len(codes)}")
        print(f"Positive (Hallucination): {sum(labels)}")
        print(f"Negative (No Hallucination): {len(labels) - sum(labels)}")
        
        # Example: Show first few results
        print(f"\n{'='*60}")
        print("Sample Results (First 3)")
        print(f"{'='*60}\n")
        
        for i in range(min(3, len(results))):
            result = results[i]
            print(f"Example {i+1}:")
            print(f"Task: {result['task_description'][:80]}...")
            print(f"Hallucination: {result['has_hallucination']}")
            print(f"Valid Syntax: {result['generated_valid_syntax']}")
            print(f"Signature Match: {result['signature_match']}")
            print(f"Tests Passed: {result['tests_passed']}/{result['num_test_cases']}")
            print(f"Length Ratio: {result['length_ratio']:.2f}")
            print("-" * 60)

if __name__ == "__main__":
    main()

  from .autonotebook import tqdm as notebook_tqdm


Device: cuda
Model: codellama/CodeLlama-7b-hf
Samples: 5
Loading model: codellama/CodeLlama-7b-hf


`torch_dtype` is deprecated! Use `dtype` instead!
Loading checkpoint shards: 100%|██████████████████| 2/2 [00:03<00:00,  1.87s/it]


Model loaded on cuda
Creating simulated MBPP dataset...
Created 3 samples, using 3

Starting Hallucination Detection Evaluation



Evaluating:  60%|███████████████████▊             | 3/5 [00:39<00:26, 13.28s/it]


Evaluation Summary

Total Samples: 3
Hallucinated: 1 (33.33%)
Non-Hallucinated: 2 (66.67%)

Syntax Errors: 0 (0.00%)
Signature Mismatches: 0 (0.00%)
Test Failures: 1 (33.33%)

Average Tests Passed: 1.67
Average Length Ratio: 1.79

Results saved to hallucination_results.json

Binary Classification Data:
Total samples: 3
Positive (Hallucination): 1
Negative (No Hallucination): 2

Sample Results (First 3)

Example 1:
Task: Write a function to find the n largest integers from a given list of numbers, re...
Hallucination: True
Valid Syntax: True
Signature Match: True
Tests Passed: 0/3
Length Ratio: 2.89
------------------------------------------------------------
Example 2:
Task: Write a function to check if a string is a palindrome....
Hallucination: False
Valid Syntax: True
Signature Match: True
Tests Passed: 2/3
Length Ratio: 1.39
------------------------------------------------------------
Example 3:
Task: Write a function to calculate the factorial of a number....
Hallucination: False




In [2]:
# Initialize configuration
config = Config()

print(f"Device: {config.DEVICE}")
print(f"Model: {config.MODEL_NAME}")
print(f"Samples: {config.NUM_SAMPLES}")

# Run evaluation
evaluator = HallucinationEvaluator(config)


Device: cuda
Model: codellama/CodeLlama-7b-hf
Samples: 5
Loading model: codellama/CodeLlama-7b-hf


`torch_dtype` is deprecated! Use `dtype` instead!
Loading checkpoint shards: 100%|██████████████████| 2/2 [00:04<00:00,  2.09s/it]


Model loaded on cuda
Loading MBPP dataset...
Loaded 427 total samples, using 5


In [3]:
results = evaluator.run_evaluation()

# Save results
evaluator.save_results("hallucination_results.json")

# Get binary classification data
codes, labels = evaluator.get_classification_data()
print(f"\nBinary Classification Data:")
print(f"Total samples: {len(codes)}")
print(f"Positive (Hallucination): {sum(labels)}")
print(f"Negative (No Hallucination): {len(labels) - sum(labels)}")

# Example: Show first few results
print(f"\n{'='*60}")
print("Sample Results (First 3)")
print(f"{'='*60}\n")




Starting Hallucination Detection Evaluation



Evaluating: 100%|█████████████████████████████████| 5/5 [01:30<00:00, 18.11s/it]


Evaluation Summary

Total Samples: 5
Hallucinated: 5 (100.00%)
Non-Hallucinated: 0 (0.00%)

Syntax Errors: 0 (0.00%)
Signature Mismatches: 5 (100.00%)
Test Failures: 5 (100.00%)

Average Tests Passed: 0.00
Average Length Ratio: 1.24

Results saved to hallucination_results.json

Binary Classification Data:
Total samples: 5
Positive (Hallucination): 5
Negative (No Hallucination): 0

Sample Results (First 3)






In [4]:
for i in range(min(3, len(results))):
    result = results[i]
    print(f"Example {i+1}:")
    print(f"""Task: 
    {result['task_description']}""")
    print(f"Hallucination: {result['has_hallucination']}")
    print(f"Valid Syntax: {result['generated_valid_syntax']}")
    print(f"Signature Match: {result['signature_match']}")
    print(f"Tests Passed: {result['tests_passed']}/{result['num_test_cases']}")
    print(f"Length Ratio: {result['length_ratio']:.2f}")
    print("-" * 60)

Example 1:
Task: 
    Write a function to find the shared elements from the given two lists.
Hallucination: True
Valid Syntax: True
Signature Match: False
Tests Passed: 0/3
Length Ratio: 1.62
------------------------------------------------------------
Example 2:
Task: 
    Write a python function to identify non-prime numbers.
Hallucination: True
Valid Syntax: True
Signature Match: False
Tests Passed: 0/4
Length Ratio: 0.96
------------------------------------------------------------
Example 3:
Task: 
    Write a function to find the n largest integers from a given list of numbers, returned in descending order.
Hallucination: True
Valid Syntax: True
Signature Match: False
Tests Passed: 0/3
Length Ratio: 1.06
------------------------------------------------------------


In [5]:
result['task_description']

'Write a function to find the n largest integers from a given list of numbers, returned in descending order.'

In [6]:
result['has_hallucination']

True

In [7]:
result

{'example_id': 2,
 'task_description': 'Write a function to find the n largest integers from a given list of numbers, returned in descending order.',
 'prompt': '# Task: Write a function to find the n largest integers from a given list of numbers, returned in descending order.\n# Write a Python function to solve this task\ndef heap_queue_largest(',
 'generated_code': 'arr):\n    import heapq\n    return [heapq.nlargest(n, arr)[-1] for n in range(len(arr), 0, -1)]',
 'reference_code': 'import heapq as hq\ndef heap_queue_largest(nums,n):\n  largest_nums = hq.nlargest(n, nums)\n  return largest_nums',
 'test_cases': ['assert heap_queue_largest( [25, 35, 22, 85, 14, 65, 75, 22, 58],3)==[85, 75, 65]',
  'assert heap_queue_largest( [25, 35, 22, 85, 14, 65, 75, 22, 58],2)==[85, 75]',
  'assert heap_queue_largest( [25, 35, 22, 85, 14, 65, 75, 22, 58],5)==[85, 75, 65, 58, 35]'],
 'generated_valid_syntax': True,
 'reference_valid_syntax': True,
 'signature_match': False,
 'tests_passed': 0,
 'le

In [15]:
results = evaluator.run_evaluation()

# Save results
evaluator.save_results("hallucination_results2.json")

# Get binary classification data
codes, labels = evaluator.get_classification_data()
print(f"\nBinary Classification Data:")
print(f"Total samples: {len(codes)}")
print(f"Positive (Hallucination): {sum(labels)}")
print(f"Negative (No Hallucination): {len(labels) - sum(labels)}")

# Example: Show first few results
print(f"\n{'='*60}")
print("Sample Results (First 3)")
print(f"{'='*60}\n")




Starting Hallucination Detection Evaluation



Evaluating: 100%|██████████| 10/10 [1:30:51<00:00, 545.19s/it]


Evaluation Summary






KeyError: 'has_hallucination'

In [None]:
for i in range(min(3, len(results))):
    result = results[i]
    print(f"Example {i+1}:")
    print(f"Task: {result['task_description']}...")
    print(f"Hallucination: {result['has_hallucination']}")
    print(f"Valid Syntax: {result['generated_valid_syntax']}")
    print(f"Signature Match: {result['signature_match']}")
    print(f"Tests Passed: {result['tests_passed']}/{result['num_test_cases']}")
    print(f"Length Ratio: {result['length_ratio']:.2f}")
    print("-" * 60)
for i in range(min(3, len(results))):
    result = results[i]
    print(f"Example {i+1}:")
    print(f"Task: {result['task_description']}...")
    print(f"Hallucination: {result['has_hallucination']}")
    print(f"Valid Syntax: {result['generated_valid_syntax']}")
    print(f"Signature Match: {result['signature_match']}")
    print(f"Tests Passed: {result['tests_passed']}/{result['num_test_cases']}")
    print(f"Length Ratio: {result['length_ratio']:.2f}")
    print("-" * 60)

# New implementation of classifier, cosine distance

In [None]:
class AdvancedCodeAnalyzer:
    def extract_features(self, prompt: str, generated_code: str, reference_code: str = None) -> Dict:
        features = {}
        
        # Syntax and Structure Features
        features['valid_syntax'] = self.is_valid_python(generated_code)
        features['code_length'] = len(generated_code)
        features['line_count'] = generated_code.count('\n') + 1
        features['avg_line_length'] = len(generated_code) / max(features['line_count'], 1)
        
        # Code Complexity Features
        features['has_imports'] = len(self.extract_imports(generated_code)) > 0
        features['function_count'] = self.count_functions(generated_code)
        features['has_loops'] = self.has_loops(generated_code)
        features['has_conditionals'] = self.has_conditionals(generated_code)
        features['has_exceptions'] = self.has_exceptions(generated_code)
        
        # Code Pattern Features
        features['has_print'] = 'print(' in generated_code
        features['has_comments'] = '#' in generated_code
        features['has_docstring'] = '"""' in generated_code or "'''" in generated_code
        features['has_todo'] = 'TODO' in generated_code.upper()
        
        # Token-based Features
        tokens = self.tokenize_code(generated_code)
        features['token_count'] = len(tokens)
        features['unique_token_ratio'] = len(set(tokens)) / max(len(tokens), 1)
        features['keyword_density'] = self.keyword_density(tokens)
        
        return features
    
    def has_loops(self, code: str) -> bool:
        return any(keyword in code for keyword in [' for ', ' while ', ' range('])
    
    def has_conditionals(self, code: str) -> bool:
        return any(keyword in code for keyword in [' if ', ' elif ', ' else:'])
    
    def has_exceptions(self, code: str) -> bool:
        return any(keyword in code for keyword in [' try:', ' except', ' raise ', ' finally:'])
    
    def tokenize_code(self, code: str) -> List[str]:
        # Simple tokenization for feature extraction
        tokens = re.findall(r'\b\w+\b|[^\w\s]', code)
        return [token for token in tokens if token.strip()]
    
    def keyword_density(self, tokens: List[str]) -> float:
        python_keywords = {'def', 'return', 'if', 'else', 'for', 'while', 'import', 'from', 
                          'class', 'try', 'except', 'with', 'as', 'in', 'is', 'and', 'or', 'not'}
        keyword_count = sum(1 for token in tokens if token in python_keywords)
        return keyword_count / max(len(tokens), 1)
class AlignmentAnalyzer:
    def extract_alignment_features(self, prompt: str, generated_code: str) -> Dict:
        features = {}
        
        # Keyword overlap between prompt and code
        prompt_keywords = set(re.findall(r'\b\w+\b', prompt.lower()))
        code_keywords = set(re.findall(r'\b\w+\b', generated_code.lower()))
        
        features['keyword_overlap'] = len(prompt_keywords & code_keywords) / max(len(prompt_keywords), 1)
        features['prompt_coverage'] = len(prompt_keywords & code_keywords) / max(len(code_keywords), 1)
        
        # Task-specific feature detection
        features['mentions_test'] = 'test' in prompt.lower()
        features['mentions_function'] = 'function' in prompt.lower()
        features['mentions_return'] = 'return' in prompt.lower()
        
        # Parameter alignment
        features['parameter_match'] = self.check_parameter_alignment(prompt, generated_code)
        
        return features
    
    def check_parameter_alignment(self, prompt: str, code: str) -> bool:
        # Extract expected parameters from prompt
        expected_params = set()
        if 'parameter' in prompt.lower():
            # Simple heuristic - look for parameter mentions
            param_matches = re.findall(r'parameter\s+(\w+)', prompt.lower())
            expected_params.update(param_matches)
        
        # Extract actual parameters from code
        try:
            tree = ast.parse(code)
            for node in ast.walk(tree):
                if isinstance(node, ast.FunctionDef):
                    actual_params = {arg.arg for arg in node.args.args}
                    return bool(expected_params & actual_params) if expected_params else True
        except:
            pass
        return False
class SemanticAnalyzer:
    def __init__(self):
        # You can use sentence transformers or other embedding models
        self.similarity_threshold = 0.7
    
    def extract_semantic_features(self, generated_code: str, reference_code: str) -> Dict:
        features = {}
        
        # Code structure similarity
        features['structure_similarity'] = self.structure_similarity(generated_code, reference_code)
        
        # Function name similarity
        features['func_name_similarity'] = self.function_name_similarity(generated_code, reference_code)
        
        # Import similarity
        features['import_similarity'] = self.import_similarity(generated_code, reference_code)
        
        return features
    
    def structure_similarity(self, code1: str, code2: str) -> float:
        # Compare AST structures
        try:
            tree1 = ast.parse(code1)
            tree2 = ast.parse(code2)
            
            # Count different node types
            nodes1 = Counter(type(node).__name__ for node in ast.walk(tree1))
            nodes2 = Counter(type(node).__name__ for node in ast.walk(tree2))
            
            # Calculate Jaccard similarity
            all_nodes = set(nodes1.keys()) | set(nodes2.keys())
            intersection = sum(min(nodes1.get(node, 0), nodes2.get(node, 0)) for node in all_nodes)
            union = sum(max(nodes1.get(node, 0), nodes2.get(node, 0)) for node in all_nodes)
            
            return intersection / union if union > 0 else 0
        except:
            return 0
def create_positive_dataset(mbpp_dataset, num_samples: int = 1000):
    """Use MBPP reference solutions as positive examples"""
    positive_examples = []
    
    for example in mbpp_dataset.iterate_samples():
        if len(positive_examples) >= num_samples:
            break
            
        positive_examples.append({
            'prompt': example.get('prompt', ''),
            'code': example['code'],
            'label': 0,  # Non-hallucinated
            'tests_passed': True,
            'source': 'mbpp_reference'
        })
    
    return positive_examples
def create_negative_dataset(positive_examples, num_samples: int = 1000):
    """Create negative examples by perturbing positive examples"""
    negative_examples = []
    
    perturbation_strategies = [
        ('syntax_error', self.introduce_syntax_errors),
        ('logical_error', self.introduce_logical_errors),
        ('semantic_error', self.introduce_semantic_errors),
        ('structural_error', self.introduce_structural_errors),
        ('irrelevant_code', self.add_irrelevant_code)
    ]
    
    for positive in positive_examples:
        if len(negative_examples) >= num_samples:
            break
            
        # Apply random perturbations
        strategy_name, strategy_fn = random.choice(perturbation_strategies)
        perturbed_code = strategy_fn(positive['code'])
        
        negative_examples.append({
            'prompt': positive['prompt'],
            'code': perturbed_code,
            'label': 1,  # Hallucinated
            'perturbation_type': strategy_name,
            'source': 'perturbed'
        })
    
    return negative_examples

def introduce_syntax_errors(self, code: str) -> str:
    """Introduce syntax errors"""
    errors = [
        lambda c: c.replace(':', ''),  # Remove colon
        lambda c: c.replace('def ', 'def'),  # Remove space after def
        lambda c: c.replace('return ', 'return'),  # Remove space after return
        lambda c: c + '\n    undefined_function()',  # Call undefined function
        lambda c: re.sub(r'(\w+)\(', r'\1(', c),  # Remove space before parenthesis
    ]
    return random.choice(errors)(code)

def introduce_logical_errors(self, code: str) -> str:
    """Introduce logical errors"""
    errors = [
        lambda c: c.replace('+', '-'),  # Change operator
        lambda c: c.replace('*', '/'),  # Change operator
        lambda c: c.replace('==', '!='),  # Change comparison
        lambda c: c.replace('>', '<'),  # Change comparison
        lambda c: re.sub(r'return (.*)', r'return \1 + 1', c),  # Add offset
    ]
    return random.choice(errors)(code)

def introduce_semantic_errors(self, code: str) -> str:
    """Introduce semantic errors"""
    errors = [
        lambda c: c.replace('sort', 'reverse'),  # Wrong method
        lambda c: c.replace('append', 'extend'),  # Wrong method
        lambda c: re.sub(r'def (\w+)', r'def wrong_\1', c),  # Wrong function name
        lambda c: re.sub(r'(\w+) =', r'wrong_\1 =', c),  # Wrong variable name
    ]
    return random.choice(errors)(code)
def create_model_generated_dataset(model, mbpp_dataset, num_samples: int = 500):
    """Generate examples using the model and label based on test results"""
    generated_examples = []
    
    for example in mbpp_dataset.iterate_samples():
        if len(generated_examples) >= num_samples:
            break
            
        prompt = example.get('prompt', '')
        generated_code = model.generate_code(prompt)
        test_cases = example.get('test_list', [])
        
        # Determine label based on test execution
        tests_passed = self.execute_tests(generated_code, test_cases)
        label = 0 if tests_passed == len(test_cases) else 1
        
        generated_examples.append({
            'prompt': prompt,
            'code': generated_code,
            'label': label,
            'tests_passed': tests_passed,
            'total_tests': len(test_cases),
            'source': 'model_generated'
        })
    
    return generated_examples
class HallucinationFeatureEngineer:
    def __init__(self):
        self.code_analyzer = AdvancedCodeAnalyzer()
        self.alignment_analyzer = AlignmentAnalyzer()
        self.semantic_analyzer = SemanticAnalyzer()
    
    def extract_all_features(self, prompt: str, generated_code: str, reference_code: str = None) -> Dict:
        """Extract comprehensive features for classification"""
        features = {}
        
        # Code quality features
        features.update(self.code_analyzer.extract_features(prompt, generated_code))
        
        # Prompt-code alignment features
        features.update(self.alignment_analyzer.extract_alignment_features(prompt, generated_code))
        
        # Semantic similarity features (if reference available)
        if reference_code:
            features.update(self.semantic_analyzer.extract_semantic_features(generated_code, reference_code))
        
        return features

# Usage for training
def prepare_training_data(positive_examples, negative_examples, model_generated_examples):
    feature_engineer = HallucinationFeatureEngineer()
    X = []
    y = []
    
    all_examples = positive_examples + negative_examples + model_generated_examples
    
    for example in all_examples:
        features = feature_engineer.extract_all_features(
            example['prompt'], 
            example['code'],
            reference_code=None  # Don't use reference in production
        )
        X.append(list(features.values()))
        y.append(example['label'])
    
    return np.array(X), np.array(y)
import lightgbm as lgb
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix

def train_hallucination_classifier(X, y):
    # Split data
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
    
    # Train LightGBM
    model = lgb.LGBMClassifier(
        n_estimators=100,
        learning_rate=0.1,
        max_depth=6,
        random_state=42
    )
    
    model.fit(X_train, y_train)
    
    # Evaluate
    y_pred = model.predict(X_test)
    print(classification_report(y_test, y_pred))
    print("Confusion Matrix:")
    print(confusion_matrix(y_test, y_pred))
    
    return model

# Feature importance analysis
def analyze_feature_importance(model, feature_names):
    importance = model.feature_importances_
    feature_imp = sorted(zip(feature_names, importance), key=lambda x: x[1], reverse=True)
    
    print("Feature Importance:")
    for feature, imp in feature_imp[:10]:  # Top 10 features
        print(f"{feature}: {imp:.4f}")