# This notebook contains a preliminary analysis of the Universal Prober for LLM

### Libraries import and defintion of constants

In [None]:
import json
import os
from sklearn.decomposition import PCA
import numpy as np
import torch
import matplotlib.pyplot as plt
from sklearn.decomposition import PCA
from sklearn.linear_model import LogisticRegression, Ridge
from sklearn.preprocessing import StandardScaler
import gc
import seaborn as sns
from sklearn.metrics import precision_score, recall_score, f1_score, accuracy_score, confusion_matrix
import traceback




In [None]:

PROJECT_ROOT = os.path.dirname(os.path.dirname(os.getcwd()))
CACHE_DIR_NAME = "activation_cache"
HF_DEFAULT_HOME = os.environ.get("HF_HOME", "~\\.cache\\huggingface\\hub")
LAYER_CONFIG = {
    "Qwen2.5-7B": [16,18,19],     
    "Falcon3-7B-Base": [26,27]  
}

## Dataset stats

In [None]:
def stats_per_json(model_name, dataset_name):
    file_path = os.path.join(PROJECT_ROOT, CACHE_DIR_NAME, model_name, dataset_name,"generations","hallucination_labels.json")
    with open(file_path, 'r') as file:
        data = json.load(file)
    total = len(data)
    hallucinations = sum(1 for item in data if item['is_hallucination'])
    percent_hallucinations = (hallucinations / total) * 100 if total > 0 else 0
    allucinated_items = [item['instance_id'] for item in data if item['is_hallucination']]
    return {
        'total': total,
        'hallucinations': hallucinations,
        'percent_hallucinations': percent_hallucinations,
        'hallucinated_items': allucinated_items,
        'model_name': model_name,
        'dataset_name': dataset_name
    }

In [None]:
qwen_stats=stats_per_json("Qwen2.5-7B", "belief_bank")
falcon_stats=stats_per_json("Falcon3-7B-Base", "belief_bank")
print("Qwen2.5-7B Hallucination Stats:", qwen_stats)
print("Falcon-7B Hallucination Stats:", falcon_stats)
common_hallucinated = set(item for item in qwen_stats['hallucinated_items']).intersection(
    set(item for item in falcon_stats['hallucinated_items'])
)
print("Number of common hallucinated instances between Qwen2.5-7B and Falcon-7B:", len(common_hallucinated))

## Model and activations stats

In [None]:
def layers_in_model(model):
    file_path = os.path.join(PROJECT_ROOT, CACHE_DIR_NAME, model)
    #open the first subdirectory found in file_path
    subdirs = [d for d in os.listdir(file_path) if os.path.isdir(os.path.join(file_path, d))]
    if not subdirs:
        raise ValueError(f"No subdirectories found in {file_path}")
    first_subdir = subdirs[0]
    layer_files = os.path.join(file_path,first_subdir, "activation_attn")
    #return number of files / 2
    return len(os.listdir(layer_files)) // 2

qwen_layers = layers_in_model("Qwen2.5-7B")
falcon_layers = layers_in_model("Falcon3-7B-Base")
print("Number of layers in Qwen2.5-7B:", qwen_layers)
print("Number of layers in Falcon-7B:", falcon_layers)

In [None]:
def createSubplots(model, dataset, num_layers, type, model_stats, dim_type,directory_to_save):
    # Calculate grid dimensions
    cols = 4
    rows = (num_layers + cols - 1) // cols
    fig, axs = plt.subplots(rows, cols, figsize=(32, 8*rows))
    axs = axs.flatten()  # Flatten to handle single row/col cases
    
    # Pre-calcola gli indici allucinati per efficienza
    hallucinated_indices = set(model_stats['hallucinated_items'])

    for layer in range(num_layers):
        file_path = os.path.join(PROJECT_ROOT, CACHE_DIR_NAME, model, dataset, "activation_"+type, f"layer{layer}_activations.pt")
        
        # Caricamento attivazioni
        try:
            activations = torch.load(file_path)
            # Se è un tensor GPU, portalo su CPU e convertilo in numpy
            if isinstance(activations, torch.Tensor):
                activations = activations.cpu().numpy()
        except FileNotFoundError:
            print(f"File non trovato: {file_path}")
            continue

        activations_2d = None
        var_text = ""

        # --- IMPLEMENTAZIONE DIMENSION REDUCTION ---
        if dim_type == "PCA":
            pca = PCA(n_components=2)
            activations_2d = pca.fit_transform(activations)
            var_text = f'(Var: {pca.explained_variance_ratio_[0]:.2%}, {pca.explained_variance_ratio_[1]:.2%})'
        
        

        # --- PLOTTING ---
        if activations_2d is not None:
            colors = ['red' if i in hallucinated_indices else 'blue' for i in range(activations_2d.shape[0])]
            
            # Scatter plot
            # Aumentato leggermente 's' (dimensione punti) e ridotto alpha per vedere meglio la densità
            axs[layer].scatter(activations_2d[:, 0], activations_2d[:, 1], c=colors, alpha=0.5, s=10)
            
            axs[layer].set_title(f'Layer {layer} {var_text}', fontsize=12, fontweight='bold')
            axs[layer].set_xlabel(f'{dim_type} 1', fontsize=10)
            axs[layer].set_ylabel(f'{dim_type} 2', fontsize=10)
            axs[layer].grid(True, alpha=0.3)
    
    # Leave unused subplots empty
    for i in range(num_layers, len(axs)):
        axs[i].axis('off')
    
    # Add legend to figure (top right corner of the entire figure)
    from matplotlib.patches import Patch
    legend_elements = [Patch(facecolor='red', label='Hallucinated'),
                       Patch(facecolor='blue', label='Non-hallucinated')]
    fig.legend(handles=legend_elements, loc='upper right', fontsize=12, bbox_to_anchor=(0.98, 0.98))
    
    fig.suptitle(f'Activations {dim_type} for {model} - {type} layers\n(Red: Hallucinated, Blue: Non-hallucinated)', fontsize=16, fontweight='bold')
    plt.tight_layout()
    plt.savefig(os.path.join(directory_to_save, f'{model}_{dataset}_{type}_activations_{dim_type}.png'), dpi=150, bbox_inches='tight')
    plt.close()
    print(f"Salvato plot per {model} - {type} - {dim_type}")




In [None]:
for dim_type in ["PCA"]:
    directory_to_save = f"activation_plots_{dim_type}"
    os.makedirs(directory_to_save, exist_ok=True)
    for type in ['attn', 'mlp',"hidden"]:
            createSubplots("Qwen2.5-7B", "belief_bank", qwen_layers, type, qwen_stats, dim_type,directory_to_save)
            createSubplots("Falcon3-7B-Base", "belief_bank", falcon_layers, type, falcon_stats, dim_type,directory_to_save)

## Classifier

In [None]:


def load_concatenated_layers(model_name, dataset_name, layer_indices, type_layer, stats):
    """
    Carica multipli layer e li concatena.
    """
    print(f"   Caricamento {model_name} [{type_layer}]: layers {layer_indices}...")
    combined_features = []
    y = None
    
    total_samples = stats['total']
    hallucinated_set = set(stats['hallucinated_items'])

    for layer_idx in layer_indices:
        file_path = os.path.join(PROJECT_ROOT, CACHE_DIR_NAME, model_name, dataset_name, "activation_"+type_layer, f"layer{layer_idx}_activations.pt")
        
        if not os.path.exists(file_path):
            print(f"Warning: Layer {layer_idx} non trovato in {file_path}. Salto.")
            continue
            
        activations = torch.load(file_path)
        if isinstance(activations, torch.Tensor):
            X_layer = activations.cpu().numpy().astype(np.float32)
        else:
            X_layer = activations.astype(np.float32)
            
        if X_layer.shape[0] > total_samples:
            X_layer = X_layer[:total_samples]
            
        combined_features.append(X_layer)
        
        if y is None:
            y = np.zeros(X_layer.shape[0], dtype=int)
            for i in range(len(y)):
                if i in hallucinated_set:
                    y[i] = 1

    if not combined_features:
        raise ValueError(f"Nessun layer caricato per {model_name}")

    X_final = np.concatenate(combined_features, axis=1)
    return X_final, y

def run_experiment_pipeline_cached(X_teacher, y_teacher, teacher_name, 
                                   X_student, y_student, student_name, layer_type):
    """
    Esegue l'esperimento con dati già splittati e normalizzati.
    (X_teacher, y_teacher, X_student, y_student sono già train/test split e normalizzati)
    """
    print(f"\n=== EXPERIMENT: {layer_type.upper()} LAYERS ({teacher_name} → {student_name}) ===")
    print(f"Teacher Input Shape ({teacher_name}): Train={X_teacher['X_train'].shape}, Test={X_teacher['X_test'].shape}")
    print(f"Student Input Shape ({student_name}): Train={X_student['X_train'].shape}, Test={X_student['X_test'].shape}")
    
    X_A_train = X_teacher['X_train']
    X_A_test = X_teacher['X_test']
    y_A_train = y_teacher['y_train']
    y_A_test = y_teacher['y_test']
    
    X_B_train = X_student['X_train']
    X_B_test = X_student['X_test']
    y_B_train = y_student['y_train']
    y_B_test = y_student['y_test']

    # --- STEP 1: Teacher Probing ---
    print(f"1. Training Teacher Probe ({teacher_name})...")
    probe_teacher = LogisticRegression(max_iter=1000, class_weight='balanced', solver='lbfgs', n_jobs=-1)
    probe_teacher.fit(X_A_train, y_A_train)
    
    # --- METRICHE TEACHER ---
    y_pred_teacher = probe_teacher.predict(X_A_test)
    cm_teacher = confusion_matrix(y_A_test, y_pred_teacher)
    acc_teacher = accuracy_score(y_A_test, y_pred_teacher)
    prec_teacher = precision_score(y_A_test, y_pred_teacher)
    rec_teacher = recall_score(y_A_test, y_pred_teacher)
    f1_teacher = f1_score(y_A_test, y_pred_teacher)
    

    # --- STEP 2: Alignment ---
    print(f"2. Learning Linear Projection ({student_name} → {teacher_name})...")
    aligner = Ridge(alpha=1000.0, fit_intercept=False) 
    aligner.fit(X_B_train, X_A_train) 
    
    # --- STEP 3: StudentOnTeacher (Cross-Model) ---
    print(f"3. Projecting {student_name} & Testing with {teacher_name} Probe...")
    X_B_test_projected = aligner.predict(X_B_test)
    y_pred_cross = probe_teacher.predict(X_B_test_projected)
    
    # --- METRICHE CROSS-MODEL ---
    cm_cross = confusion_matrix(y_B_test, y_pred_cross)
    acc_cross = accuracy_score(y_B_test, y_pred_cross)
    prec_cross = precision_score(y_B_test, y_pred_cross)
    rec_cross = recall_score(y_B_test, y_pred_cross)
    f1_cross = f1_score(y_B_test, y_pred_cross)
    
    print(f"   -> {student_name} on {teacher_name} Accuracy: {acc_cross:.4f}")
    
    return {
        "type": layer_type,
        "teacher_name": teacher_name,
        "student_name": student_name,
        "teacher": {
            "accuracy": acc_teacher,
            "precision": prec_teacher,
            "recall": rec_teacher,
            "f1": f1_teacher,
            "confusion_matrix": cm_teacher.tolist()
        },
        "student_on_teacher": {
            "accuracy": acc_cross,
            "precision": prec_cross,
            "recall": rec_cross,
            "f1": f1_cross,
            "confusion_matrix": cm_cross.tolist()
        }
    }

def plot_confusion_matrix(cm, layer_type, model_name="", save_dir="confusion_matrices"):
    """
    Plotta e salva la confusion matrix come immagine.
    """
    os.makedirs(save_dir, exist_ok=True)
    
    fig, ax = plt.subplots(figsize=(8, 6))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', cbar=True, ax=ax,
                xticklabels=['Non-Hallucinated', 'Hallucinated'],
                yticklabels=['Non-Hallucinated', 'Hallucinated'])
    ax.set_ylabel('True Label')
    ax.set_xlabel('Predicted Label')
    title = f'Confusion Matrix - {layer_type.upper()} Layers'
    if model_name:
        title += f' ({model_name})'
    ax.set_title(title)
    
    plt.tight_layout()
    filename = os.path.join(save_dir, f'confusion_matrix_{layer_type}_{model_name}.png' if model_name else f'confusion_matrix_{layer_type}.png')
    plt.savefig(filename, dpi=150, bbox_inches='tight')
    plt.close()
    print(f"   ✓ Salvato: {filename}")


In [None]:
LAYER_CONFIG = {
    "Qwen2.5-7B": list(range(19, 24)),      
    "Falcon3-7B-Base": list(range(23, 27))  
}

print("="*80)
print("FASE 1: PRE-CARICAMENTO E SPLITTING DEI DATI (stessi indici shuffled per TUTTI i layer type)")
print("="*80 + "\n")


n_samples = qwen_stats['total'] 
rng = np.random.RandomState(42)
shuffled_indices = rng.permutation(n_samples)
split_idx = int(0.7 * n_samples)

train_indices = shuffled_indices[:split_idx]
test_indices = shuffled_indices[split_idx:]


data_splits = {}
for layer_type in ['attn', 'mlp', 'hidden']:
    gc.collect()
    
    # Carica Qwen
    X_qwen, y_qwen = load_concatenated_layers(
        "Qwen2.5-7B", "belief_bank", 
        LAYER_CONFIG["Qwen2.5-7B"], 
        layer_type, qwen_stats
    )
    
    # Carica Falcon
    X_falcon, y_falcon = load_concatenated_layers(
        "Falcon3-7B-Base", "belief_bank", 
        LAYER_CONFIG["Falcon3-7B-Base"], 
        layer_type, falcon_stats
    )
    
    # Applica gli STESSI indici a entrambi i modelli
    X_qwen_train, X_qwen_test = X_qwen[train_indices], X_qwen[test_indices]
    y_qwen_train, y_qwen_test = y_qwen[train_indices], y_qwen[test_indices]
    
    X_falcon_train, X_falcon_test = X_falcon[train_indices], X_falcon[test_indices]
    y_falcon_train, y_falcon_test = y_falcon[train_indices], y_falcon[test_indices]
    
    # Normalizza una sola volta
    scaler_qwen = StandardScaler()
    X_qwen_train = scaler_qwen.fit_transform(X_qwen_train)
    X_qwen_test = scaler_qwen.transform(X_qwen_test)
    
    scaler_falcon = StandardScaler()
    X_falcon_train = scaler_falcon.fit_transform(X_falcon_train)
    X_falcon_test = scaler_falcon.transform(X_falcon_test)
    
    # Salva in un dizionario le informazioni
    data_splits[layer_type] = {
        "qwen": {
            "X_train": X_qwen_train,
            "X_test": X_qwen_test,
            "y_train": y_qwen_train,
            "y_test": y_qwen_test
        },
        "falcon": {
            "X_train": X_falcon_train,
            "X_test": X_falcon_test,
            "y_train": y_falcon_train,
            "y_test": y_falcon_test
        }
    }
    
    #li cancello poichè ho tutto quello che mi serve nel dizionario
    del X_qwen, y_qwen, X_falcon, y_falcon

print("\n" + "="*80)
print("FASE 2: ESECUZIONE ESPERIMENTI SU ENTRAMBI GLI SCENARI")
print("="*80 + "\n")



# Definisci gli scenari di esperimento
scenarios = [
    {
        "teacher_model": "Qwen2.5-7B",
        "student_model": "Falcon3-7B-Base",
    },
    {
        "teacher_model": "Falcon3-7B-Base",
        "student_model": "Qwen2.5-7B",
    }
]

all_results = []

# Loop su entrambi gli scenari
for scenario_idx, scenario in enumerate(scenarios, 1):
    print(f"\n{'='*80}")
    print(f"SCENARIO {scenario_idx}: {scenario['teacher_model']} → {scenario['student_model']}")
    print(f"{'='*80}\n")
    
    results = []
    
    # Loop sui 3 tipi di layer richiesti
    for layer_type in ['attn', 'mlp', 'hidden']:
        
        try:
            # Recupera i dati pre-splittati e normalizzati
            if scenario['teacher_model'] == "Qwen2.5-7B":
                X_teacher_data = data_splits[layer_type]['qwen']
                X_student_data = data_splits[layer_type]['falcon']
            else:
                X_teacher_data = data_splits[layer_type]['falcon']
                X_student_data = data_splits[layer_type]['qwen']
            
            # Esegui pipeline con dati della cache
            res = run_experiment_pipeline_cached(
                X_teacher_data, X_teacher_data, scenario['teacher_model'],
                X_student_data, X_student_data, scenario['student_model'],
                layer_type
            )
            results.append(res)
            
            # 4. Plotta confusion matrices
            print(f"\n   Creazione visualizzazioni confusion matrices...")
            plot_confusion_matrix(
                np.array(res['teacher']['confusion_matrix']), 
                layer_type, 
                f"Teacher_{scenario['teacher_model'].split('.')[0]}"
            )
            plot_confusion_matrix(
                np.array(res['student_on_teacher']['confusion_matrix']), 
                layer_type, 
                f"{scenario['student_model'].split('.')[0]}_on_{scenario['teacher_model'].split('.')[0]}"
            )
            
        except Exception as e:
            print(f"Errore critico nel layer {layer_type}: {e}")
            traceback.print_exc()
    
    all_results.append({
        "scenario": f"{scenario['teacher_model']} (teacher) → {scenario['student_model']} (student)",
        "results": results
    })

# Salva tutti i risultati in JSON
os.makedirs("results_metrics", exist_ok=True)
metrics_file = "results_metrics/experiment_results_all_scenarios.json"

all_results_json = []
for scenario_data in all_results:
    scenario_results = []
    for r in scenario_data['results']:
        scenario_results.append({
            "layer_type": r['type'],
            "teacher_model": r['teacher_name'],
            "student_model": r['student_name'],
            "teacher": {
                "accuracy": round(r['teacher']['accuracy'], 4),
                "precision": round(r['teacher']['precision'], 4),
                "recall": round(r['teacher']['recall'], 4),
                "f1_score": round(r['teacher']['f1'], 4),
                "confusion_matrix": {
                    "TN": int(r['teacher']['confusion_matrix'][0][0]),
                    "FP": int(r['teacher']['confusion_matrix'][0][1]),
                    "FN": int(r['teacher']['confusion_matrix'][1][0]),
                    "TP": int(r['teacher']['confusion_matrix'][1][1])
                }
            },
            "student_on_teacher": {
                "accuracy": round(r['student_on_teacher']['accuracy'], 4),
                "precision": round(r['student_on_teacher']['precision'], 4),
                "recall": round(r['student_on_teacher']['recall'], 4),
                "f1_score": round(r['student_on_teacher']['f1'], 4),
                "confusion_matrix": {
                    "TN": int(r['student_on_teacher']['confusion_matrix'][0][0]),
                    "FP": int(r['student_on_teacher']['confusion_matrix'][0][1]),
                    "FN": int(r['student_on_teacher']['confusion_matrix'][1][0]),
                    "TP": int(r['student_on_teacher']['confusion_matrix'][1][1])
                }
            }
        })
    
    all_results_json.append({
        "scenario": scenario_data['scenario'],
        "results": scenario_results
    })

with open(metrics_file, 'w') as f:
    json.dump(all_results_json, f, indent=2)

print(f"\n✓ Risultati salvati in: {metrics_file}")


In [None]:
# Dimensione dei modelli

print("=== Controllo Dimensioni Vettori ===")

dataset = "belief_bank"
types = ["attn", "mlp", "hidden"]

for model_name, layers in LAYER_CONFIG.items():
    print(f"\nModello: {model_name}")
    # Prendiamo il primo layer disponibile nella configurazione
    layer_idx = layers[0]
    
    for type_layer in types:
        file_path = os.path.join(PROJECT_ROOT, CACHE_DIR_NAME, model_name, dataset, "activation_"+type_layer, f"layer{layer_idx}_activations.pt")
        
        try:
            if os.path.exists(file_path):
                activations = torch.load(file_path)
                # Gestione sia per Tensor che per Numpy array
                shape = activations.shape if hasattr(activations, 'shape') else "Unknown"
                print(f"  - Type: {type_layer:<10} | Layer: {layer_idx} | Shape: {shape}")
            else:
                print(f"  - Type: {type_layer:<10} | Layer: {layer_idx} | File non trovato")
        except Exception as e:
            print(f"  - Type: {type_layer:<10} | Layer: {layer_idx} | Errore: {e}")