# Spectre: Weight Outlier Detection & Integrity Fingerprint Engine

This notebook demonstrates how to use Spectre to scan model checkpoints for weight-level anomalies and tampering.


## Setup and Imports


In [None]:
import sys
from pathlib import Path
import json
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np

# Add parent directory to path
sys.path.insert(0, str(Path.cwd().parent))

from spectre.core.config import Config
from spectre.core.pipeline import Pipeline
from spectre.core.output import generate_all_outputs
from spectre.io.loader import get_checkpoint_info

%matplotlib inline


## Configuration

Set the path to your model checkpoint and configure the scan.


In [None]:
# Model checkpoint path
MODEL_PATH = Path("/path/to/your/model.safetensors")  # Update this!

# Output directory
OUTPUT_DIR = Path("./scan_results")
OUTPUT_DIR.mkdir(exist_ok=True)

# Model ruleset (gpt_like, llama_like, mistral_like, phi_like)
RULESET = "gpt_like"

# Whether to save visualizations
SAVE_VISUALS = True
TOP_K_VISUALS = 20


## Create Configuration

Create a configuration object. You can either load from YAML or create programmatically.


In [None]:
# Option 1: Load from YAML file
# config = Config(config_path=Path("../scan_config.yaml"))

# Option 2: Create programmatically
config = Config()
config.set("model.path", str(MODEL_PATH))
config.set("model.ruleset", RULESET)
config.set("output.dir", str(OUTPUT_DIR))
config.set("output.save_visuals", SAVE_VISUALS)
config.set("output.topk_visuals", TOP_K_VISUALS)

# Enable/disable specific detectors
config.set("svd.enabled", True)
config.set("interlayer.enabled", True)
config.set("distribution.enabled", True)
config.set("robust.enabled", True)
config.set("energy.enabled", True)
config.set("rmt.enabled", True)
config.set("spectrogram.enabled", True)
config.set("gsp.enabled", True)
config.set("tda.enabled", True)
config.set("sequence_cp.enabled", True)
config.set("ot.enabled", True)
config.set("multiview.enabled", False)  # Default disabled
config.set("conformal.enabled", True)

print(f"Configuration loaded:")
print(f"  Model: {config.get('model.path')}")
print(f"  Ruleset: {config.get('model.ruleset')}")
print(f"  Output: {config.get('output.dir')}")


## Check Model Info

Get basic information about the checkpoint before scanning.


In [None]:
if MODEL_PATH.exists():
    model_info = get_checkpoint_info(MODEL_PATH)
    print(f"Model Information:")
    print(f"  Path: {model_info['path']}")
    print(f"  Size: {model_info['size_bytes'] / 1e9:.2f} GB")
    print(f"  Format: {model_info['format']}")
    print(f"  Number of tensors: {model_info['num_tensors']}")
    print(f"  Total parameters: {model_info['total_params']:,}")
else:
    print(f"Warning: Model path does not exist: {MODEL_PATH}")
    print("Please update MODEL_PATH in the configuration cell above.")


## Run Pipeline

Initialize and run the full pipeline: load → map → detect → score → output.


In [None]:
# Initialize pipeline
pipeline = Pipeline(config)

print("Starting Spectre scan...")
print(f"Model: {MODEL_PATH}")
print(f"Output: {OUTPUT_DIR}")
print(f"Ruleset: {RULESET}")
print()

# Run pipeline
results = pipeline.run(checkpoint_path=MODEL_PATH)

print("\nScan complete!")


## View Results Summary

Display the scan summary with risk flags and suspect tensors.


In [None]:
summary = results.get("summary", {})

print("=" * 60)
print("Scan Summary")
print("=" * 60)
print(f"Total tensors: {summary.get('total_tensors', 0)}")
print(f"Ensemble sigma: {summary.get('ensemble_sigma', 0.0):.4f}")
print(f"Overall flag: {summary.get('flag', 'GREEN')}")
print()

# Flag distribution
flag_counts = summary.get("flag_counts", {})
if flag_counts:
    print("Flag distribution:")
    for flag, count in flag_counts.items():
        print(f"  {flag}: {count}")
    print()

# Top suspects
suspects = summary.get("suspects", [])
if suspects:
    print(f"Top {min(10, len(suspects))} suspects:")
    for i, suspect in enumerate(suspects[:10], 1):
        print(f"  {i}. {suspect['name'][:60]}")
        print(f"     Role: {suspect['role']}, Layer: {suspect.get('layer_idx', 'N/A')}")
        print(f"     Score: {suspect['ensemble_score']:.4f}, Flag: {suspect['flag']}")
        print()


## Load and Analyze Features

Load the generated features CSV for detailed analysis.


In [None]:
# Load features CSV
features_path = OUTPUT_DIR / "features.csv"
if features_path.exists():
    df_features = pd.read_csv(features_path)
    print(f"Loaded {len(df_features)} tensor features")
    print(f"\nColumns: {len(df_features.columns)}")
    print(f"\nFirst few rows:")
    display(df_features.head())
    
    # Show feature statistics
    print("\nFeature statistics:")
    numeric_cols = df_features.select_dtypes(include=[np.number]).columns
    display(df_features[numeric_cols].describe())
else:
    print(f"Features CSV not found: {features_path}")


## Load Per-Layer Summary

Analyze results by layer.


In [None]:
# Load per-layer summary
layer_summary_path = OUTPUT_DIR / "per_layer_summary.csv"
if layer_summary_path.exists():
    df_layers = pd.read_csv(layer_summary_path)
    print(f"Loaded {len(df_layers)} layer summaries")
    display(df_layers)
    
    # Plot layer scores
    if len(df_layers) > 0:
        fig, axes = plt.subplots(2, 1, figsize=(12, 8))
        
        # Average ensemble score by layer
        axes[0].plot(df_layers['layer_idx'], df_layers['avg_ensemble_score'], 'b-o', linewidth=2, markersize=4)
        axes[0].axhline(y=2.0, color='g', linestyle='--', label='GREEN threshold')
        axes[0].axhline(y=3.0, color='r', linestyle='--', label='RED threshold')
        axes[0].set_xlabel('Layer Index')
        axes[0].set_ylabel('Average Ensemble Score')
        axes[0].set_title('Average Ensemble Score by Layer')
        axes[0].legend()
        axes[0].grid(True, alpha=0.3)
        
        # Max ensemble score by layer
        axes[1].plot(df_layers['layer_idx'], df_layers['max_ensemble_score'], 'r-o', linewidth=2, markersize=4)
        axes[1].axhline(y=2.0, color='g', linestyle='--', label='GREEN threshold')
        axes[1].axhline(y=3.0, color='r', linestyle='--', label='RED threshold')
        axes[1].set_xlabel('Layer Index')
        axes[1].set_ylabel('Max Ensemble Score')
        axes[1].set_title('Max Ensemble Score by Layer')
        axes[1].legend()
        axes[1].grid(True, alpha=0.3)
        
        plt.tight_layout()
        plt.savefig(OUTPUT_DIR / "layer_scores.png", dpi=150, bbox_inches="tight")
        plt.show()
else:
    print(f"Layer summary CSV not found: {layer_summary_path}")


## Analyze Detector Scores

Examine scores from individual detectors.


In [None]:
if pipeline.scorer:
    # Get detector scores for top suspects
    suspects = pipeline.scorer.get_suspects(top_k=10)
    
    if suspects:
        # Create DataFrame of detector scores
        detector_data = []
        for suspect in suspects:
            row = {"name": suspect["name"][:50], "ensemble": suspect["ensemble_score"]}
            row.update(suspect.get("detector_scores", {}))
            detector_data.append(row)
        
        df_detectors = pd.DataFrame(detector_data)
        
        print("Detector scores for top suspects:")
        display(df_detectors)
        
        # Plot detector scores heatmap
        if len(df_detectors) > 1:
            detector_cols = [col for col in df_detectors.columns if col not in ["name", "ensemble"]]
            if detector_cols:
                import seaborn as sns
                
                plt.figure(figsize=(12, max(6, len(df_detectors) * 0.3)))
                sns.heatmap(
                    df_detectors[detector_cols].T,
                    xticklabels=df_detectors["name"],
                    yticklabels=detector_cols,
                    cmap="RdYlGn_r",
                    center=0,
                    vmin=-5,
                    vmax=5,
                    cbar_kws={"label": "Z-Score"}
                )
                plt.title("Detector Z-Scores for Top Suspects")
                plt.xlabel("Tensor")
                plt.ylabel("Detector")
                plt.xticks(rotation=45, ha="right")
                plt.tight_layout()
                plt.savefig(OUTPUT_DIR / "detector_scores_heatmap.png", dpi=150, bbox_inches="tight")
                plt.show()
else:
    print("Scorer not available")


## View Fingerprint JSON

Load and display the full fingerprint JSON.


In [None]:
# Load fingerprint JSON
fingerprint_path = OUTPUT_DIR / "fingerprint_v2.json"
if fingerprint_path.exists():
    with open(fingerprint_path, "r") as f:
        fingerprint = json.load(f)
    
    print("Fingerprint structure:")
    print(f"  Model: {fingerprint.get('model', {}).get('path', 'N/A')}")
    print(f"  Summary: {list(fingerprint.get('summary', {}).keys())}")
    print(f"  Per-tensor entries: {len(fingerprint.get('per_tensor', []))}")
    print(f"  Detectors: {list(fingerprint.get('detectors', {}).keys())}")
    
    # Display summary
    print("\nFull summary:")
    print(json.dumps(fingerprint.get("summary", {}), indent=2))
else:
    print(f"Fingerprint JSON not found: {fingerprint_path}")


## Custom Analysis

Perform custom analysis on the results.


## View Generated Visualizations

Display the visualizations that were generated during the scan.


In [None]:
viz_dir = OUTPUT_DIR / "visualizations"
if viz_dir.exists():
    viz_files = list(viz_dir.glob("*.png"))
    print(f"Found {len(viz_files)} visualization files:")
    for viz_file in sorted(viz_files):
        print(f"  - {viz_file.name}")
    
    # Display key visualizations
    key_viz = ["similarity_heatmap.png", "zscore_heatmap.png"]
    
    for viz_name in key_viz:
        viz_path = viz_dir / viz_name
        if viz_path.exists():
            print(f"\n{viz_name}:")
            img = plt.imread(viz_path)
            plt.figure(figsize=(14, 10))
            plt.imshow(img)
            plt.axis("off")
            plt.title(viz_name.replace("_", " ").title())
            plt.tight_layout()
            plt.show()
    
    # Show list of all generated visualizations
    print(f"\nAll generated visualizations ({len(viz_files)} total):")
    for viz_file in sorted(viz_files):
        print(f"  - {viz_file.name}")
else:
    print(f"Visualizations directory not found: {viz_dir}")
    print("Set SAVE_VISUALS=True to generate visualizations")


## Detector Analysis Summary

Analyze which detectors are contributing most to the anomaly scores.


In [None]:
if pipeline.scorer:
    # Analyze detector contributions
    detector_contributions = {}
    
    for tensor_name, detector_scores in pipeline.scorer.detector_scores.items():
        for detector, score in detector_scores.items():
            if detector not in detector_contributions:
                detector_contributions[detector] = []
            detector_contributions[detector].append(abs(score))
    
    # Compute statistics per detector
    detector_stats = {}
    for detector, scores in detector_contributions.items():
        if scores:
            detector_stats[detector] = {
                "mean": np.mean(scores),
                "std": np.std(scores),
                "max": np.max(scores),
                "count": len(scores),
                "high_anomaly_count": sum(1 for s in scores if s >= 3.0)
            }
    
    # Create DataFrame
    df_detector_stats = pd.DataFrame(detector_stats).T
    df_detector_stats = df_detector_stats.sort_values("max", ascending=False)
    
    print("Detector Statistics:")
    print("=" * 60)
    display(df_detector_stats)
    
    # Plot detector statistics
    if len(df_detector_stats) > 0:
        fig, axes = plt.subplots(2, 2, figsize=(14, 10))
        
        # Mean scores
        axes[0, 0].bar(range(len(df_detector_stats)), df_detector_stats["mean"])
        axes[0, 0].set_xticks(range(len(df_detector_stats)))
        axes[0, 0].set_xticklabels(df_detector_stats.index, rotation=45, ha="right")
        axes[0, 0].set_ylabel("Mean |Z-Score|")
        axes[0, 0].set_title("Mean Detector Scores")
        axes[0, 0].grid(True, alpha=0.3)
        
        # Max scores
        axes[0, 1].bar(range(len(df_detector_stats)), df_detector_stats["max"], color="r")
        axes[0, 1].set_xticks(range(len(df_detector_stats)))
        axes[0, 1].set_xticklabels(df_detector_stats.index, rotation=45, ha="right")
        axes[0, 1].set_ylabel("Max |Z-Score|")
        axes[0, 1].set_title("Max Detector Scores")
        axes[0, 1].axhline(y=3.0, color="r", linestyle="--", label="RED threshold")
        axes[0, 1].axhline(y=4.5, color="darkred", linestyle="--", label="HARD_RED threshold")
        axes[0, 1].legend()
        axes[0, 1].grid(True, alpha=0.3)
        
        # High anomaly count
        axes[1, 0].bar(range(len(df_detector_stats)), df_detector_stats["high_anomaly_count"], color="orange")
        axes[1, 0].set_xticks(range(len(df_detector_stats)))
        axes[1, 0].set_xticklabels(df_detector_stats.index, rotation=45, ha="right")
        axes[1, 0].set_ylabel("Count (|Z| >= 3.0)")
        axes[1, 0].set_title("High Anomaly Count per Detector")
        axes[1, 0].grid(True, alpha=0.3)
        
        # Detector weights (from configuration)
        detector_weights = pipeline.scorer.weights
        weights_list = [detector_weights.get(det, 0.0) for det in df_detector_stats.index]
        axes[1, 1].bar(range(len(df_detector_stats)), weights_list, color="g")
        axes[1, 1].set_xticks(range(len(df_detector_stats)))
        axes[1, 1].set_xticklabels(df_detector_stats.index, rotation=45, ha="right")
        axes[1, 1].set_ylabel("Weight")
        axes[1, 1].set_title("Detector Weights in Ensemble")
        axes[1, 1].grid(True, alpha=0.3)
        
        plt.tight_layout()
        plt.savefig(OUTPUT_DIR / "detector_analysis.png", dpi=150, bbox_inches="tight")
        plt.show()
else:
    print("Scorer not available")


## Export Results Summary

Export a summary of results for reporting or further analysis.


In [None]:
# Export summary to JSON
summary_export = {
    "model_path": str(MODEL_PATH),
    "ruleset": RULESET,
    "summary": summary,
    "output_dir": str(OUTPUT_DIR),
    "timestamp": pd.Timestamp.now().isoformat()
}

export_path = OUTPUT_DIR / "summary_export.json"
with open(export_path, "w") as f:
    json.dump(summary_export, f, indent=2)

print(f"Summary exported to: {export_path}")
print(f"\nAll outputs saved to: {OUTPUT_DIR}")
print(f"\nGenerated files:")
print(f"  - fingerprint_v2.json")
print(f"  - features.csv")
print(f"  - per_layer_summary.csv")
if SAVE_VISUALS:
    print(f"  - visualizations/ ({len(list((OUTPUT_DIR / 'visualizations').glob('*.png'))) if (OUTPUT_DIR / 'visualizations').exists() else 0} files)")
print(f"  - summary_export.json")


In [None]:
# Example: Analyze features by role
if features_path.exists():
    df_features = pd.read_csv(features_path)
    
    # Group by role
    if "spectral.stable_rank" in df_features.columns:
        role_stats = df_features.groupby("role").agg({
            "spectral.stable_rank": ["mean", "std", "max"],
            "distribution.entropy": ["mean", "std"] if "distribution.entropy" in df_features.columns else [],
            "energy.frobenius": ["mean", "std"] if "energy.frobenius" in df_features.columns else []
        })
        
        print("Statistics by role:")
        display(role_stats)
        
        # Plot feature distributions by role
        fig, axes = plt.subplots(1, 2, figsize=(14, 5))
        
        # Stable rank by role
        df_features.boxplot(column="spectral.stable_rank", by="role", ax=axes[0])
        axes[0].set_title("Stable Rank by Role")
        axes[0].set_xlabel("Role")
        axes[0].set_ylabel("Stable Rank")
        
        # Entropy by role
        if "distribution.entropy" in df_features.columns:
            df_features.boxplot(column="distribution.entropy", by="role", ax=axes[1])
            axes[1].set_title("Entropy by Role")
            axes[1].set_xlabel("Role")
            axes[1].set_ylabel("Entropy")
        
        plt.tight_layout()
        plt.savefig(OUTPUT_DIR / "role_analysis.png", dpi=150, bbox_inches="tight")
        plt.show()
else:
    print("Features CSV not available for custom analysis")
