<a href="https://colab.research.google.com/github/AvdMei/AI_Bullshit_Detector/blob/ages_branch/AI_Bullshit_Detector_Notebook.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# AI Bullshit Detector

Team: Age, Anika, Nate

Attribution: LiquidAI (model builder), HuggingFace (hosting)

LLM mode: LiquidAI LMF2-1.2B

Sundai project 11-jan-26

In [4]:
# =============================================================================
# LFM2 + IDK HEAD - FIXED COLAB NOTEBOOK
# =============================================================================
# Copy each cell into Google Colab to run the complete pipeline
# FIXED: Recursion error by separating base model loading from wrapper
# =============================================================================


# =============================================================================
# CELL 1: Install dependencies (run this first)
# =============================================================================
# !pip install -q transformers>=4.55.0 torch tqdm


# =============================================================================
# CELL 2: Imports and Setup
# =============================================================================

from dataclasses import dataclass
from typing import Optional, Tuple, Dict, List
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from transformers import AutoModelForCausalLM, AutoTokenizer
from transformers.utils import ModelOutput
from tqdm import tqdm

DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using device: {DEVICE}")


# =============================================================================
# CELL 3: Output Dataclass
# =============================================================================

@dataclass
class Lfm2IDKOutput(ModelOutput):
    """Output with IDK score."""
    loss: Optional[torch.FloatTensor] = None
    logits: Optional[torch.FloatTensor] = None
    idk_score: Optional[torch.FloatTensor] = None
    idk_components: Optional[dict] = None
    past_key_values: Optional[Tuple] = None
    hidden_states: Optional[Tuple] = None


# =============================================================================
# CELL 4: IDK Head Components
# =============================================================================

class FlowPredictor(nn.Module):
    """Predicts z12 from z8. High error = out-of-distribution."""

    def __init__(self, hidden_dim: int = 2048, bottleneck_dim: int = 512, dropout: float = 0.1):
        super().__init__()
        self.predictor = nn.Sequential(
            nn.Linear(hidden_dim, bottleneck_dim),
            nn.GELU(),
            nn.LayerNorm(bottleneck_dim),
            nn.Dropout(dropout),
            nn.Linear(bottleneck_dim, bottleneck_dim),
            nn.GELU(),
            nn.LayerNorm(bottleneck_dim),
            nn.Dropout(dropout),
            nn.Linear(bottleneck_dim, hidden_dim),
        )
        self.error_scale = nn.Parameter(torch.tensor(1.0))
        self.error_bias = nn.Parameter(torch.tensor(0.0))

    def forward(self, z8: torch.Tensor) -> torch.Tensor:
        return self.predictor(z8)

    def compute_error(self, z8: torch.Tensor, z12: torch.Tensor) -> torch.Tensor:
        z12_pred = self.forward(z8)
        mse = F.mse_loss(z12_pred, z12, reduction='none').mean(dim=-1)
        return torch.sigmoid(self.error_scale * mse + self.error_bias)


class HeadDisagreementModule(nn.Module):
    """Computes KV head disagreement. High variance = uncertainty."""

    def __init__(self):
        super().__init__()
        self.scale = nn.Parameter(torch.tensor(1.0))
        self.bias = nn.Parameter(torch.tensor(0.0))

    def forward(self, kv_8: Tuple, kv_12: Tuple) -> torch.Tensor:
        d8 = self._compute_disagreement(kv_8)
        d12 = self._compute_disagreement(kv_12)
        avg = (d8 + d12) / 2
        return torch.sigmoid(self.scale * avg + self.bias)

    def _compute_disagreement(self, kv: Tuple) -> torch.Tensor:
        k, v = kv
        if k.numel() == 0:
            return torch.zeros(1, device=k.device)
        k_last = k[:, :, -1, :].float()
        v_last = v[:, :, -1, :].float()
        return k_last.var(dim=1).mean(dim=-1) + v_last.var(dim=1).mean(dim=-1)


class EntropyModule(nn.Module):
    """Computes normalized output entropy."""

    def __init__(self, vocab_size: int = 65536):
        super().__init__()
        self.max_entropy = torch.log(torch.tensor(float(vocab_size)))
        self.scale = nn.Parameter(torch.tensor(1.0))
        self.bias = nn.Parameter(torch.tensor(0.0))

    def forward(self, logits: torch.Tensor) -> torch.Tensor:
        if logits.dim() == 3:
            logits = logits[:, -1, :]
        probs = F.softmax(logits.float(), dim=-1)
        entropy = -(probs * torch.log(probs + 1e-10)).sum(dim=-1)
        normalized = entropy / self.max_entropy.to(logits.device)
        return torch.sigmoid(self.scale * normalized + self.bias)


class IDKHead(nn.Module):
    """
    IDK Head: Combines flow prediction, head disagreement, and entropy
    into a single uncertainty score from 0-100.
    """

    def __init__(
        self,
        hidden_dim: int = 2048,
        bottleneck_dim: int = 512,
        dropout: float = 0.1,
        vocab_size: int = 65536,
        flow_weight: float = 0.4,
        head_weight: float = 0.3,
        entropy_weight: float = 0.3
    ):
        super().__init__()

        self.flow_predictor = FlowPredictor(hidden_dim, bottleneck_dim, dropout)
        self.head_disagreement = HeadDisagreementModule()
        self.entropy_module = EntropyModule(vocab_size)

        self.flow_weight = flow_weight
        self.head_weight = head_weight
        self.entropy_weight = entropy_weight

    def forward(
        self,
        z8: torch.Tensor = None,
        z12: torch.Tensor = None,
        kv_cache_8: Tuple = None,
        kv_cache_12: Tuple = None,
        logits: torch.Tensor = None
    ) -> Tuple[torch.Tensor, dict]:

        components = {}
        signals = []
        weights = []

        # 1. Flow prediction error
        if z8 is not None and z12 is not None:
            flow_error = self.flow_predictor.compute_error(z8, z12)
            components['flow_error'] = flow_error.item() if flow_error.dim() == 0 else flow_error[0].item()
            signals.append(flow_error)
            weights.append(self.flow_weight)

        # 2. Head disagreement
        if kv_cache_8 is not None and kv_cache_12 is not None:
            head_disagree = self.head_disagreement(kv_cache_8, kv_cache_12)
            components['head_disagreement'] = head_disagree.item() if head_disagree.dim() == 0 else head_disagree[0].item()
            signals.append(head_disagree)
            weights.append(self.head_weight)

        # 3. Entropy
        if logits is not None:
            entropy_signal = self.entropy_module(logits)
            components['entropy_signal'] = entropy_signal.item() if entropy_signal.dim() == 0 else entropy_signal[0].item()
            signals.append(entropy_signal)
            weights.append(self.entropy_weight)

        # Combine signals
        if len(signals) == 0:
            return torch.tensor([50.0]), components

        # Normalize weights
        total_weight = sum(weights)
        weights = [w / total_weight for w in weights]

        # Weighted sum
        combined = sum(s * w for s, w in zip(signals, weights))
        idk_score = combined * 100

        return idk_score, components


# =============================================================================
# CELL 5: Wrapper Class (Simplified - avoids recursion)
# =============================================================================

class LFM2WithIDK(nn.Module):
    """
    LFM2 with IDK uncertainty head.

    This is a simple nn.Module wrapper (not PreTrainedModel) to avoid
    recursion issues. For HuggingFace Hub hosting, use the full package version.
    """

    def __init__(
        self,
        base_model,  # Pass pre-loaded model
        hidden_dim: int = 2048,
        bottleneck_dim: int = 512,
        layer_8_idx: int = 8,
        layer_12_idx: int = 12,
    ):
        super().__init__()

        # Store base model (already loaded)
        self.lm = base_model

        # Freeze base model
        for param in self.lm.parameters():
            param.requires_grad = False

        # IDK Head
        self.idk_head = IDKHead(
            hidden_dim=hidden_dim,
            bottleneck_dim=bottleneck_dim,
        )

        # Layer indices (hidden_states[0] is embeddings, so +1)
        self.z8_idx = layer_8_idx + 1  # 9
        self.z12_idx = layer_12_idx + 1  # 13
        self.layer_8_idx = layer_8_idx
        self.layer_12_idx = layer_12_idx

    def forward(
        self,
        input_ids: torch.Tensor,
        attention_mask: torch.Tensor = None,
        past_key_values = None,
        output_idk_score: bool = True,
    ) -> Lfm2IDKOutput:

        # Forward through base model
        outputs = self.lm(
            input_ids=input_ids,
            attention_mask=attention_mask,
            past_key_values=past_key_values,
            use_cache=True,
            output_hidden_states=True,
            return_dict=True,
        )

        idk_score = None
        idk_components = None

        if output_idk_score and outputs.hidden_states is not None:
            # Extract z8, z12
            z8 = outputs.hidden_states[self.z8_idx][:, -1, :].float()
            z12 = outputs.hidden_states[self.z12_idx][:, -1, :].float()

            # Extract KV caches
            kv_cache_8, kv_cache_12 = None, None

            if outputs.past_key_values is not None:
                cache = outputs.past_key_values
                try:
                    kv_8 = cache[self.layer_8_idx]
                    kv_12 = cache[self.layer_12_idx]

                    if isinstance(kv_8, tuple) and len(kv_8) >= 2:
                        if kv_8[0].numel() > 0:
                            kv_cache_8 = (kv_8[0].float(), kv_8[1].float())

                    if isinstance(kv_12, tuple) and len(kv_12) >= 2:
                        if kv_12[0].numel() > 0:
                            kv_cache_12 = (kv_12[0].float(), kv_12[1].float())
                except (IndexError, TypeError, AttributeError):
                    pass

            # Compute IDK score
            idk_score, idk_components = self.idk_head(
                z8=z8,
                z12=z12,
                kv_cache_8=kv_cache_8,
                kv_cache_12=kv_cache_12,
                logits=outputs.logits.float()
            )

        return Lfm2IDKOutput(
            logits=outputs.logits,
            idk_score=idk_score,
            idk_components=idk_components,
            past_key_values=outputs.past_key_values,
            hidden_states=outputs.hidden_states,
        )

    def get_idk_parameters(self):
        """Get only IDK head parameters for training."""
        return self.idk_head.parameters()


# =============================================================================
# CELL 6: Training Data Extraction
# =============================================================================

def extract_samples_from_base_model(base_model, tokenizer, prompts, max_tokens=50):
    """
    Extract (z8, z12) training pairs from the base model.
    Uses the base model directly to avoid wrapper issues.
    """
    template = "<|startoftext|><|im_start|>user\n{question}<|im_end|>\n<|im_start|>assistant\n"
    samples = []

    base_model.eval()
    device = next(base_model.parameters()).device

    for prompt in tqdm(prompts, desc="Extracting samples"):
        input_text = template.format(question=prompt)
        input_ids = tokenizer.encode(input_text, return_tensors="pt").to(device)

        past = None
        current = input_ids

        for step in range(max_tokens):
            with torch.no_grad():
                outputs = base_model(
                    input_ids=current,
                    past_key_values=past,
                    use_cache=True,
                    output_hidden_states=True,
                    return_dict=True
                )

            # Extract hidden states
            # Layer 8 output is at index 9, layer 12 is at index 13
            hs = outputs.hidden_states
            z8 = hs[9][:, -1, :].float().cpu().squeeze(0)
            z12 = hs[13][:, -1, :].float().cpu().squeeze(0)

            samples.append({'z8': z8, 'z12': z12})

            # Get next token
            next_logits = outputs.logits[:, -1, :]
            next_token = next_logits.argmax(dim=-1)

            # Check EOS
            if next_token.item() == tokenizer.eos_token_id:
                break

            # Update for next step
            past = outputs.past_key_values
            current = next_token.unsqueeze(0)

    return samples


# =============================================================================
# CELL 7: Training Function
# =============================================================================

class SampleDataset(Dataset):
    def __init__(self, samples):
        self.samples = samples

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        return self.samples[idx]


def train_idk_head(model, samples, epochs=20, lr=1e-4, batch_size=64):
    """Train the IDK head using flow prediction loss."""

    dataset = SampleDataset(samples)
    loader = DataLoader(dataset, batch_size=batch_size, shuffle=True, drop_last=False)

    device = next(model.idk_head.parameters()).device
    optimizer = torch.optim.AdamW(model.get_idk_parameters(), lr=lr, weight_decay=0.01)
    scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=epochs)

    print(f"Training on {len(samples)} samples, {len(loader)} batches per epoch")

    model.idk_head.train()

    for epoch in range(epochs):
        total_loss = 0
        num_batches = 0

        for batch in loader:
            z8 = batch['z8'].to(device)
            z12 = batch['z12'].to(device)

            # Flow prediction loss
            z12_pred = model.idk_head.flow_predictor(z8)
            loss = F.mse_loss(z12_pred, z12)

            optimizer.zero_grad()
            loss.backward()
            torch.nn.utils.clip_grad_norm_(model.idk_head.parameters(), 1.0)
            optimizer.step()

            total_loss += loss.item()
            num_batches += 1

        scheduler.step()
        avg_loss = total_loss / num_batches

        if (epoch + 1) % 5 == 0 or epoch == 0:
            print(f"Epoch {epoch+1:3d}/{epochs} | Loss: {avg_loss:.6f}")

    model.idk_head.eval()
    return model


# =============================================================================
# CELL 8: RUN THE COMPLETE PIPELINE
# =============================================================================

# --- STEP 1: Load base model and tokenizer ---
print("=" * 60)
print("STEP 1: Loading LFM2 base model...")
print("=" * 60)

tokenizer = AutoTokenizer.from_pretrained("LiquidAI/LFM2-1.2B")
base_model = AutoModelForCausalLM.from_pretrained(
    "LiquidAI/LFM2-1.2B",
    torch_dtype=torch.float32,
    device_map="auto"
)

print(f"Base model loaded on {base_model.device}")

# --- STEP 2: Create wrapper with IDK head ---
print("\n" + "=" * 60)
print("STEP 2: Creating LFM2 + IDK wrapper...")
print("=" * 60)

model = LFM2WithIDK(base_model=base_model)
model.to(DEVICE)

idk_params = sum(p.numel() for p in model.idk_head.parameters())
print(f"IDK head parameters: {idk_params:,}")

# --- STEP 3: Prepare training prompts ---
print("\n" + "=" * 60)
print("STEP 3: Preparing training prompts...")
print("=" * 60)

training_prompts = [

    # =========================================================================
    # CATEGORY 1: FACTUAL - COMMON KNOWLEDGE (Model should be CONFIDENT)
    # =========================================================================

    # Geography
    "What is the capital of France?",
    "What is the capital of Japan?",
    "What is the capital of Germany?",
    "What is the capital of Australia?",
    "What is the capital of Brazil?",
    "What is the capital of Canada?",
    "What is the capital of Egypt?",
    "What is the capital of India?",
    "What is the largest country by land area?",
    "What is the smallest country in the world?",
    "How many continents are there?",
    "What is the longest river in the world?",
    "What is the largest ocean?",
    "What is the tallest mountain on Earth?",

    # Science basics
    "What is water made of?",
    "What is the chemical formula for water?",
    "What is the chemical symbol for gold?",
    "What is the chemical symbol for oxygen?",
    "How many planets are in our solar system?",
    "What is the closest planet to the Sun?",
    "What is the largest planet in our solar system?",
    "What is the speed of light?",
    "What is the boiling point of water in Celsius?",
    "What is the freezing point of water in Fahrenheit?",

    # History & Literature
    "Who wrote Hamlet?",
    "Who wrote Romeo and Juliet?",
    "Who wrote 1984?",
    "Who wrote Pride and Prejudice?",
    "Who painted the Mona Lisa?",
    "Who invented the telephone?",
    "Who invented the light bulb?",
    "What year did World War 2 end?",
    "What year did World War 1 begin?",
    "Who was the first person to walk on the moon?",

    # Math & Numbers
    "What is 2 + 2?",
    "What is the square root of 144?",
    "How many days are in a year?",
    "How many hours are in a day?",
    "How many seconds are in a minute?",
    "What is the value of pi to two decimal places?",

    # =========================================================================
    # CATEGORY 2: TECHNICAL - EXPLANATIONS (Model should be moderately confident)
    # =========================================================================

    "Explain how neural networks learn",
    "What is machine learning?",
    "How does photosynthesis work?",
    "What is quantum entanglement?",
    "What is the difference between TCP and UDP?",
    "Explain the theory of relativity",
    "What is CRISPR gene editing?",
    "How do vaccines work?",
    "What is blockchain technology?",
    "How does encryption work?",
    "What is an algorithm?",
    "Explain how a CPU processes instructions",
    "What is the difference between RAM and ROM?",
    "How does GPS work?",
    "What is artificial intelligence?",
    "Explain the water cycle",
    "How do airplanes fly?",
    "What causes earthquakes?",
    "How does the immune system work?",
    "What is DNA and how does it work?",
    "Explain how batteries store energy",
    "What is the difference between AC and DC current?",

    # =========================================================================
    # CATEGORY 3: RECENT/TEMPORAL (Model should be UNCERTAIN - data cutoff)
    # =========================================================================

    "Who won the most recent Super Bowl?",
    "What is the current stock price of Apple?",
    "Who is the current president of France?",
    "Who is the current CEO of Twitter?",
    "What was the weather yesterday in New York?",
    "What movies came out this week?",
    "What is the current price of Bitcoin?",
    "Who won the most recent Grammy for Album of the Year?",
    "What is the current population of the world?",
    "Who won the last FIFA World Cup?",
    "What is the current interest rate set by the Federal Reserve?",
    "What was the closing price of the S&P 500 yesterday?",

    # =========================================================================
    # CATEGORY 4: SUBJECTIVE/OPINION (Model should be UNCERTAIN)
    # =========================================================================

    "What is the best programming language?",
    "Is pineapple good on pizza?",
    "What is the meaning of life?",
    "What is the best movie ever made?",
    "Should I learn Python or JavaScript?",
    "What is the best country to live in?",
    "Is coffee better than tea?",
    "What is the best music genre?",
    "Should I buy a Mac or PC?",
    "What is the most beautiful city in the world?",
    "Is it better to rent or buy a house?",
    "What is the best way to learn a new language?",
    "Should I exercise in the morning or evening?",
    "What is the best career choice for someone who likes math?",
    "Is reading better than watching movies?",

    # =========================================================================
    # CATEGORY 5: IMPOSSIBLE - FUTURE PREDICTIONS (Model should be VERY UNCERTAIN)
    # =========================================================================

    "What will happen tomorrow?",
    "Who will win the next election?",
    "What will the stock market do next week?",
    "Will it rain on my birthday?",
    "What will AI look like in 100 years?",
    "Who will win the next World Cup?",
    "What will be the next major scientific discovery?",
    "When will humans land on Mars?",
    "What will be the biggest news story next month?",
    "Will there be a recession next year?",
    "What company will be the most valuable in 10 years?",
    "What will be the next pandemic?",

    # =========================================================================
    # CATEGORY 6: IMPOSSIBLE - PERSONAL/PRIVATE (Model CANNOT know)
    # User-provided questions about personal unknowable information
    # =========================================================================

    "What will I personally eat for dinner tomorrow night?",
    "What number am I thinking of right now?",
    "What will be the first typo I make tomorrow?",
    "What is the password to my personal email account?",
    "What will be the exact time I fall asleep tonight?",
    "What decision will I make five minutes from now?",
    "What is written on the last sticky note I used?",
    "What will the weather be at my exact location in 30 days?",
    "What song will get stuck in my head next?",
    "What is inside the unopened box on my desk?",
    "What will be the next thought I have?",
    "What will be the next object I touch?",
    "What will my mood be exactly at noon tomorrow?",
    "What private message did I last delete?",
    "What will be the exact price of gas at my nearest station tomorrow?",
    "What will be the next dream I remember?",
    "What will I write as my next password?",
    "What will be the next mistake I make today?",
    "What will be the exact wording of my next text message?",
    "What will be the last thing I think about before I fall asleep tonight?",

    # =========================================================================
    # CATEGORY 7: OBSCURE FACTUAL (Model likely UNCERTAIN - rare knowledge)
    # User-provided questions about obscure facts
    # =========================================================================

    "Who received the IEEE Frank Rosenblatt Award in 2010?",
    "Who was awarded the Oceanography Society's Jerlov Award in 2018?",
    "What's the name of the women's liberal arts college in Cambridge, Massachusetts?",
    "In whose honor was the Leipzig 1877 tournament organized?",
    "According to Karl Küchler, what did Empress Elizabeth of Austria's favorite sculpture depict, which was made for her villa Achilleion at Corfu?",
    "How much money, in euros, was the surgeon held responsible for Stella Obasanjo's death ordered to pay her son?",
    "What were the month and year when Obama told Christianity Today, 'I am a Christian, and I am a devout Christian. I believe in the redemptive death and resurrection of Jesus Christ'?",
    "Who appointed the Chief Justice of India, Mirza Hameedullah Beg, in 1977?",
    "What is the name of the former Prime Minister of Iceland who worked as a cabin crew member until 1971?",
    "To whom did Mehbooba Mufti Sayed contest the 2019 Lok Sabha elections and lose?",
    "How many fouls did Inter commit in the Champions League final match between Bayern and Inter on May 23, 2010?",
    "What year did the Lego part with ID gal56 first release?",
    "In which year did the Japanese scientist Koichi Mizushima receive the Kato Memorial Prize?",
    "In which year did Melbourne's Monash Gallery of Art (MGA) rebrand and become the Museum of Australian Photography (MAPh)?",
    "Who requested the Federal Aviation Administration (FAA) implement a 900 sq mi (2,300 km²) temporary flight restriction zone over the operations areas of the Deepwater Horizon?",
    "What signature piece of the MOBA did Scott Wilson discover on the curb between two trash cans?",
    "What player scored all the conversions for Spain in the rugby match between Spain and Romania that was part of the 2022 Rugby Europe Championship on February 27, 2022?",
    "What is the surname of the psychiatrist who prescribes medication for Marie Hanson for her periodic blackouts in Season 1, Episode 20 of Ally McBeal?",
    "What is the British-American kickboxer Andrew Tate's kickboxing name?",
    "What position was John Gilbert Layton appointed to in Quebec from 1969 until 1970?",

    # =========================================================================
    # CATEGORY 8: ADDITIONAL OBSCURE/TRIVIA (Model likely UNCERTAIN)
    # =========================================================================

    "What is the population of Tuvalu?",
    "Who was the 23rd Prime Minister of Canada?",
    "What is the atomic weight of Lawrencium?",
    "What is the GDP of Liechtenstein?",
    "Who won the Nobel Prize in Chemistry in 1987?",
    "What is the deepest point in the Indian Ocean?",
    "Who was the Roman Emperor in 117 AD?",
    "What is the national bird of Bhutan?",
    "Who designed the Sydney Opera House?",
    "What year was the Treaty of Westphalia signed?",
    "What is the chemical formula for rust?",
    "Who invented the zipper?",
    "What is the currency of Myanmar?",
    "What is the second largest moon of Saturn?",
    "Who wrote the novel 'The Master and Margarita'?",
    "What year was the first email sent?",
    "What is the tallest building in South America?",
    "Who was the first female Prime Minister of Pakistan?",
    "What is the capital of Burkina Faso?",
    "What element has the atomic number 79?",

    # =========================================================================
    # CATEGORY 9: ADDITIONAL IMPOSSIBLE/RANDOM (Model should be VERY UNCERTAIN)
    # =========================================================================

    "What am I thinking right now?",
    "What will be the lottery numbers next week?",
    "What is the exact number of grains of sand on Earth?",
    "What will be the exact temperature at noon tomorrow in my backyard?",
    "How many birds are flying right now?",
    "What will be the next word I type?",
    "What color socks am I wearing?",
    "What did I have for breakfast three weeks ago?",
    "What will be trending on social media tomorrow?",
    "What is my favorite color?",
    "How many times will I blink today?",
    "What will be my next Google search?",
    "What will I name my future pet?",
    "What time will I wake up next Saturday?",
    "What will be the headline news in exactly one year?",

    # =========================================================================
    # CATEGORY 10: PARADOXES AND UNANSWERABLE (Model should be VERY UNCERTAIN)
    # =========================================================================

    "What happens when an unstoppable force meets an immovable object?",
    "Can God create a rock so heavy that even God cannot lift it?",
    "If a tree falls in a forest and no one is around, does it make a sound?",
    "What existed before the universe?",
    "What is north of the North Pole?",
    "What is the sound of one hand clapping?",
    "Why is there something rather than nothing?",
    "What is outside the universe?",
    "Can you prove that reality is not a simulation?",
    "What is the last digit of pi?",
]

print(f"Prepared {len(training_prompts)} diverse prompts")

# --- STEP 4: Extract training samples ---
print("\n" + "=" * 60)
print("STEP 4: Extracting training samples...")
print("=" * 60)

samples = extract_samples_from_base_model(
    base_model=base_model,  # Use base model directly
    tokenizer=tokenizer,
    prompts=training_prompts,
    max_tokens=30
)

print(f"Extracted {len(samples)} (z8, z12) pairs")

# --- STEP 5: Train IDK head ---
print("\n" + "=" * 60)
print("STEP 5: Training IDK head...")
print("=" * 60)

model = train_idk_head(model, samples, epochs=15, lr=1e-4, batch_size=32)

# --- STEP 6: Test inference ---
print("\n" + "=" * 60)
print("STEP 6: Testing inference with IDK scores...")
print("=" * 60)

test_prompts = [
    ("What is 2 + 2?", "Should be CONFIDENT"),
    ("What is the capital of France?", "Should be CONFIDENT"),
    ("What is the meaning of life?", "Should be UNCERTAIN"),
    ("Who will win the lottery tomorrow?", "Should be VERY UNCERTAIN"),
    ("What is the 1000th digit of pi?", "Should be VERY UNCERTAIN"),
]

template = "<|startoftext|><|im_start|>user\n{question}<|im_end|>\n<|im_start|>assistant\n"

model.eval()
print("\nResults:")
print("-" * 70)

for question, expected in test_prompts:
    input_text = template.format(question=question)
    input_ids = tokenizer.encode(input_text, return_tensors="pt").to(DEVICE)

    with torch.no_grad():
        outputs = model(input_ids, output_idk_score=True)

    idk = outputs.idk_score
    if isinstance(idk, torch.Tensor):
        idk = idk.item() if idk.numel() == 1 else idk[0].item()

    # Interpret
    if idk < 20:
        conf = "🟢 Very Confident"
    elif idk < 40:
        conf = "🟡 Confident"
    elif idk < 60:
        conf = "🟠 Uncertain"
    elif idk < 80:
        conf = "🔴 Low Confidence"
    else:
        conf = "⚫ Very Uncertain"

    print(f"\nQ: {question}")
    print(f"   IDK Score: {idk:.1f}/100 {conf}")
    print(f"   Expected: {expected}")
    if outputs.idk_components:
        print(f"   Components: {outputs.idk_components}")

print("\n" + "=" * 60)
print("✅ COMPLETE!")
print("=" * 60)


# =============================================================================
# CELL 9: (OPTIONAL) Generate text with per-token IDK
# =============================================================================

def generate_with_idk(model, tokenizer, prompt, max_tokens=30, temperature=0.3):
    """Generate text with per-token IDK scores."""
    template = "<|startoftext|><|im_start|>user\n{question}<|im_end|>\n<|im_start|>assistant\n"
    input_text = template.format(question=prompt)
    input_ids = tokenizer.encode(input_text, return_tensors="pt").to(DEVICE)

    generated_tokens = []
    idk_scores = []

    current = input_ids
    past = None

    model.eval()
    for _ in range(max_tokens):
        with torch.no_grad():
            outputs = model(current, past_key_values=past, output_idk_score=True)

        # Get IDK score
        idk = outputs.idk_score
        if isinstance(idk, torch.Tensor):
            idk = idk.item() if idk.numel() == 1 else idk[0].item()
        idk_scores.append(idk)

        # Sample next token
        logits = outputs.logits[:, -1, :]
        if temperature > 0:
            probs = F.softmax(logits / temperature, dim=-1)
            next_token = torch.multinomial(probs, num_samples=1)
        else:
            next_token = logits.argmax(dim=-1, keepdim=True)

        generated_tokens.append(next_token.item())

        # Check EOS
        if next_token.item() == tokenizer.eos_token_id:
            break

        past = outputs.past_key_values
        current = next_token

    text = tokenizer.decode(generated_tokens, skip_special_tokens=True)

    return {
        'text': text,
        'tokens': generated_tokens,
        'idk_scores': idk_scores,
        'mean_idk': sum(idk_scores) / len(idk_scores) if idk_scores else 0,
    }


# Test generation
print("\n" + "=" * 60)
print("BONUS: Generation with per-token IDK")
print("=" * 60)

result = generate_with_idk(model, tokenizer, "What is machine learning?", max_tokens=20)
print(f"\nQuestion: What is machine learning?")
print(f"Answer: {result['text']}")
print(f"Mean IDK: {result['mean_idk']:.1f}/100")
print(f"\nPer-token IDK scores:")
for i, (tok, idk) in enumerate(zip(result['tokens'], result['idk_scores'])):
    tok_str = tokenizer.decode([tok])
    print(f"  {i}: '{tok_str}' -> IDK: {idk:.1f}")

Using device: cuda
STEP 1: Loading LFM2 base model...


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


tokenizer_config.json: 0.00B [00:00, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

special_tokens_map.json:   0%|          | 0.00/434 [00:00<?, ?B/s]

chat_template.jinja: 0.00B [00:00, ?B/s]

config.json: 0.00B [00:00, ?B/s]

`torch_dtype` is deprecated! Use `dtype` instead!


model.safetensors:   0%|          | 0.00/2.34G [00:00<?, ?B/s]

generation_config.json:   0%|          | 0.00/137 [00:00<?, ?B/s]

Base model loaded on cuda:0

STEP 2: Creating LFM2 + IDK wrapper...
IDK head parameters: 2,364,422

STEP 3: Preparing training prompts...
Prepared 186 diverse prompts

STEP 4: Extracting training samples...


Extracting samples: 100%|██████████| 186/186 [01:40<00:00,  1.85it/s]


Extracted 5344 (z8, z12) pairs

STEP 5: Training IDK head...
Training on 5344 samples, 167 batches per epoch
Epoch   1/15 | Loss: 0.115839
Epoch   5/15 | Loss: 0.051918
Epoch  10/15 | Loss: 0.040745
Epoch  15/15 | Loss: 0.039628

STEP 6: Testing inference with IDK scores...

Results:
----------------------------------------------------------------------

Q: What is 2 + 2?
   IDK Score: 63.6/100 🔴 Low Confidence
   Expected: Should be CONFIDENT
   Components: {'flow_error': 0.5061379671096802, 'head_disagreement': 0.9274423718452454, 'entropy_signal': 0.5170724391937256}

Q: What is the capital of France?
   IDK Score: 63.2/100 🔴 Low Confidence
   Expected: Should be CONFIDENT
   Components: {'flow_error': 0.5049847960472107, 'head_disagreement': 0.9281729459762573, 'entropy_signal': 0.5042762756347656}

Q: What is the meaning of life?
   IDK Score: 63.1/100 🔴 Low Confidence
   Expected: Should be UNCERTAIN
   Components: {'flow_error': 0.5069972276687622, 'head_disagreement': 0.9245220

# save the model weights for HuggingFace

In [5]:
import torch
from google.colab import files

torch.save(model.idk_head.state_dict(), "pytorch_model.bin")
files.download("pytorch_model.bin")

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

# Demo the Liquid AI model and Bullshit Detector in action

In [8]:
# =============================================================================
# DEMO CELL - Run after training completes
# =============================================================================
# The model and tokenizer are already loaded from training above.
# Just change QUESTION and re-run this cell to test different prompts!
# =============================================================================

# ============================================
# CHANGE THIS QUESTION AND RE-RUN!
# ============================================

QUESTION = "What is the capital of France?"

# ============================================

# Template for LFM2
template = "<|startoftext|><|im_start|>user\n{question}<|im_end|>\n<|im_start|>assistant\n"

# Encode
prompt = template.format(question=QUESTION)
input_ids = tokenizer.encode(prompt, return_tensors="pt").to(DEVICE)

# Generate with IDK tracking
generated_tokens = []
idk_scores = []
current = input_ids
past = None

model.eval()
for _ in range(50):  # max tokens
    with torch.no_grad():
        outputs = model(current, past_key_values=past, output_idk_score=True)

    # Get IDK score
    idk = outputs.idk_score
    if isinstance(idk, torch.Tensor):
        idk = idk.item() if idk.numel() == 1 else idk[0].item()
    idk_scores.append(idk)

    # Sample next token
    logits = outputs.logits[:, -1, :]
    probs = F.softmax(logits / 0.3, dim=-1)  # temperature=0.3
    next_token = torch.multinomial(probs, num_samples=1)

    generated_tokens.append(next_token.item())

    # Check EOS
    if next_token.item() == tokenizer.eos_token_id:
        break

    past = outputs.past_key_values
    current = next_token

# Decode answer
answer = tokenizer.decode(generated_tokens, skip_special_tokens=True)
mean_idk = sum(idk_scores) / len(idk_scores) if idk_scores else 0

# Get label
if mean_idk < 30:
    label = "[GREEN] Confident"
elif mean_idk < 50:
    label = "[YELLOW] Moderate"
elif mean_idk < 70:
    label = "[ORANGE] Uncertain"
else:
    label = "[RED] Very Uncertain"

# Display results
print("=" * 60)
print("QUESTION:", QUESTION)
print("-" * 60)
print("ANSWER:", answer)
print("-" * 60)
print("IDK SCORE: {:.1f}/100 - {}".format(mean_idk, label))
print("=" * 60)
print("")
print("IDK Score Guide:")
print("  0-30  = Confident (likely reliable)")
print("  30-50 = Moderate (probably fine)")
print("  50-70 = Uncertain (verify this)")
print("  70-100 = Very Uncertain (high hallucination risk)")

QUESTION: What is the capital of France?
------------------------------------------------------------
ANSWER: The capital of France is Paris. This city is not only the political center of France, but also a global hub for art, fashion, gastronomy, and culture. Known as the "City of Light" or "La Ville Lumière,"
------------------------------------------------------------
IDK SCORE: 63.4/100 - [ORANGE] Uncertain

IDK Score Guide:
  0-30  = Confident (likely reliable)
  30-50 = Moderate (probably fine)
  50-70 = Uncertain (verify this)
  70-100 = Very Uncertain (high hallucination risk)


In [1]:
!pip install lion_pytorch

Collecting lion_pytorch
  Downloading lion_pytorch-0.2.3-py3-none-any.whl.metadata (616 bytes)
Downloading lion_pytorch-0.2.3-py3-none-any.whl (6.6 kB)
Installing collected packages: lion_pytorch
Successfully installed lion_pytorch-0.2.3


In [2]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
import numpy as np
import pandas as pd
from dataclasses import dataclass, field
from typing import List, Dict, Tuple
from tqdm import tqdm
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.preprocessing import MinMaxScaler, normalize
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity

DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
TEMPLATE_WITHOUT_ANSWER = "<|startoftext|><|im_start|>user\n{question}<|im_end|>\n<|im_start|>assistant\n"

print(f"Device: {DEVICE}")

Device: cuda


In [3]:
@dataclass
class HallucinationFeatures:
    """Features extracted for hallucination detection."""
    prompt: str
    question: str
    input_length: int
    z8_states: List[torch.Tensor] = field(default_factory=list)
    z12_states: List[torch.Tensor] = field(default_factory=list)
    kv8_head_disagreement: List[float] = field(default_factory=list)
    kv12_head_disagreement: List[float] = field(default_factory=list)
    generated_tokens: List[int] = field(default_factory=list)
    token_probs: List[float] = field(default_factory=list)
    token_entropies: List[float] = field(default_factory=list)
    top5_tokens: List[List[Tuple[str, float]]] = field(default_factory=list)
    full_text: str = ""
    mean_prob: float = 0.0
    mean_entropy: float = 0.0
    mean_head_disagreement_8: float = 0.0
    mean_head_disagreement_12: float = 0.0

In [None]:
def compute_head_disagreement(kv_cache_layer) -> float:
    """Compute variance across KV heads - higher = more disagreement."""
    k, v = kv_cache_layer
    if k.numel() == 0:
        return 0.0
    k_last = k[:, :, -1, :].float()
    v_last = v[:, :, -1, :].float()
    return k_last.var(dim=1).mean().item() + v_last.var(dim=1).mean().item()


def extract_hallucination_features(model, tokenizer, question: str, max_new_tokens: int = 50) -> HallucinationFeatures:
    """Extract z8, z12, KV cache stats, and output probabilities."""
    prompt = TEMPLATE_WITHOUT_ANSWER.format(question=question)
    input_ids = tokenizer.encode(prompt, return_tensors="pt").to(model.device)

    features = HallucinationFeatures(prompt=prompt, question=question, input_length=input_ids.shape[1])
    past_key_values = None
    current_ids = input_ids

    for step in range(max_new_tokens):
        with torch.no_grad():
            outputs = model(
                current_ids,
                past_key_values=past_key_values,
                use_cache=True,
                output_hidden_states=True,
                return_dict=True
            )

        # Hidden states: index 9 = layer 8 output, index 13 = layer 12 output
        hidden_states = outputs.hidden_states
        z8 = hidden_states[9][:, -1, :].float().cpu()
        z12 = hidden_states[13][:, -1, :].float().cpu()

        features.z8_states.append(z8)
        features.z12_states.append(z12)

        # KV cache head disagreement
        cache = outputs.past_key_values
        features.kv8_head_disagreement.append(compute_head_disagreement(cache[8]))
        features.kv12_head_disagreement.append(compute_head_disagreement(cache[12]))

        # Logits and probabilities
        logits = outputs.logits[:, -1, :].float()
        probs = F.softmax(logits, dim=-1)
        entropy = -(probs * torch.log(probs + 1e-10)).sum().item()
        next_token = torch.argmax(logits, dim=-1)
        selected_prob = probs[0, next_token.item()].item()

        # Top-5
        top5_probs, top5_idx = torch.topk(probs[0], 5)
        top5 = [(tokenizer.decode([idx.item()]), prob.item()) for idx, prob in zip(top5_idx, top5_probs)]

        features.generated_tokens.append(next_token.item())
        features.token_probs.append(selected_prob)
        features.token_entropies.append(entropy)
        features.top5_tokens.append(top5)

        if next_token.item() == tokenizer.eos_token_id:
            break

        past_key_values = outputs.past_key_values
        current_ids = next_token.unsqueeze(0)

    # Summary stats
    features.full_text = tokenizer.decode(features.generated_tokens, skip_special_tokens=True)
    features.mean_prob = np.mean(features.token_probs) if features.token_probs else 0.0
    features.mean_entropy = np.mean(features.token_entropies) if features.token_entropies else 0.0
    features.mean_head_disagreement_8 = np.mean(features.kv8_head_disagreement) if features.kv8_head_disagreement else 0.0
    features.mean_head_disagreement_12 = np.mean(features.kv12_head_disagreement) if features.kv12_head_disagreement else 0.0

    return features

In [None]:
class Z8Z12LogitsPredictor(nn.Module):
    """Predict z12 from z8, and logits summary from z12."""
    def __init__(self, hidden_dim: int = 2048, bottleneck_dim: int = 512):
        super().__init__()
        self.z8_to_z12 = nn.Sequential(
            nn.Linear(hidden_dim, bottleneck_dim),
            nn.GELU(),
            nn.LayerNorm(bottleneck_dim),
            nn.Linear(bottleneck_dim, bottleneck_dim),
            nn.GELU(),
            nn.LayerNorm(bottleneck_dim),
            nn.Linear(bottleneck_dim, hidden_dim),
        )
        self.z12_to_logits_summary = nn.Sequential(
            nn.Linear(hidden_dim, bottleneck_dim),
            nn.GELU(),
            nn.LayerNorm(bottleneck_dim),
            nn.Linear(bottleneck_dim, 6),  # entropy + top5 probs
        )

    def forward(self, z8):
        z12_pred = self.z8_to_z12(z8)
        logits_summary_pred = self.z12_to_logits_summary(z12_pred)
        return z12_pred, logits_summary_pred

    def compute_oos_score(self, z8, z12, logits_summary, alpha=0.7, beta=0.3):
        z12_pred, logits_pred = self.forward(z8)
        err_z12 = F.mse_loss(z12_pred, z12, reduction='none').mean(dim=-1)
        err_logits = F.mse_loss(logits_pred, logits_summary, reduction='none').mean(dim=-1)
        return alpha * err_z12 + beta * err_logits


class HallucinationDataset(Dataset):
    def __init__(self, features_list: List[HallucinationFeatures]):
        self.samples = []
        for feat in features_list:
            for i in range(len(feat.z8_states)):
                top5_probs = [p for _, p in feat.top5_tokens[i]]
                while len(top5_probs) < 5:
                    top5_probs.append(0.0)
                logits_summary = torch.tensor([feat.token_entropies[i]] + top5_probs[:5], dtype=torch.float32)
                self.samples.append({
                    'z8': feat.z8_states[i].squeeze(0),
                    'z12': feat.z12_states[i].squeeze(0),
                    'logits_summary': logits_summary,
                })

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        return self.samples[idx]


def train_predictor(features_list, epochs=20, lr=1e-4, batch_size=64):
    dataset = HallucinationDataset(features_list)
    loader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

    print(f"Training on {len(dataset)} token samples")

    predictor = Z8Z12LogitsPredictor().to(DEVICE)
    optimizer = torch.optim.AdamW(predictor.parameters(), lr=lr)

    predictor.train()
    for epoch in range(epochs):
        total_loss = 0
        for batch in loader:
            z8 = batch['z8'].to(DEVICE)
            z12 = batch['z12'].to(DEVICE)
            logits_summary = batch['logits_summary'].to(DEVICE)

            z12_pred, logits_pred = predictor(z8)
            loss_z12 = F.mse_loss(z12_pred, z12)
            loss_logits = F.mse_loss(logits_pred, logits_summary)
            loss = 0.7 * loss_z12 + 0.3 * loss_logits

            optimizer.zero_grad()
            loss.backward()
            torch.nn.utils.clip_grad_norm_(predictor.parameters(), 1.0)
            optimizer.step()
            total_loss += loss.item()

        if (epoch + 1) % 5 == 0:
            print(f"Epoch {epoch+1:3d}/{epochs} | Loss: {total_loss/len(loader):.6f}")

    return predictor

In [None]:
def compute_oos_scores(predictor, features_list):
    predictor.eval()
    oos_scores = []

    for feat in features_list:
        if len(feat.z8_states) == 0:
            oos_scores.append(0.0)
            continue

        scores = []
        for i in range(len(feat.z8_states)):
            z8 = feat.z8_states[i].to(DEVICE)
            z12 = feat.z12_states[i].to(DEVICE)

            top5_probs = [p for _, p in feat.top5_tokens[i]]
            while len(top5_probs) < 5:
                top5_probs.append(0.0)
            logits_summary = torch.tensor([feat.token_entropies[i]] + top5_probs[:5], dtype=torch.float32, device=DEVICE).unsqueeze(0)

            with torch.no_grad():
                score = predictor.compute_oos_score(z8, z12, logits_summary)
            scores.append(score.item())

        oos_scores.append(np.mean(scores))

    return oos_scores

In [None]:
def compute_paraphrase_stats(sim_matrix, n_questions, group_size=5):
    """Compute within-group similarity stats."""
    stats = []
    for idx in range(n_questions):
        group_start = (idx // group_size) * group_size
        group_end = min(group_start + group_size, n_questions)

        same_group_sims = [sim_matrix[idx, j] for j in range(group_start, group_end) if j != idx]

        if same_group_sims:
            stats.append({
                'mean_sim': np.mean(same_group_sims),
                'std_sim': np.std(same_group_sims),
                'min_sim': np.min(same_group_sims),
            })
        else:
            stats.append({'mean_sim': 1.0, 'std_sim': 0.0, 'min_sim': 1.0})

    return stats

In [None]:
# Assumes `model` and `tokenizer` are already loaded
# Load data
df = pd.read_csv("AI_Bullshit_Detector/data/paraphase-prompts.csv")
questions = df['question'].tolist()

# Step 1: Extract features
print("=" * 60)
print("STEP 1: Extracting internal features...")
print("=" * 60)

all_features = []
for question in tqdm(questions, desc="Processing"):
    try:
        features = extract_hallucination_features(model, tokenizer, question, max_new_tokens=100)
        all_features.append(features)
    except Exception as e:
        print(f"Error: {e}")
        all_features.append(HallucinationFeatures(prompt="", question=question, input_length=0))

df['llm_output'] = [f.full_text for f in all_features]
df['mean_prob'] = [f.mean_prob for f in all_features]
df['mean_entropy'] = [f.mean_entropy for f in all_features]
df['head_disagree_8'] = [f.mean_head_disagreement_8 for f in all_features]
df['head_disagree_12'] = [f.mean_head_disagreement_12 for f in all_features]

# Step 2: Train predictor
print("\n" + "=" * 60)
print("STEP 2: Training z8 → z12 → logits predictor...")
print("=" * 60)

predictor = train_predictor(all_features, epochs=20, lr=1e-4)

# Step 3: OOS scores
print("\n" + "=" * 60)
print("STEP 3: Computing OOS scores...")
print("=" * 60)

df['oos_score'] = compute_oos_scores(predictor, all_features)

# Step 4: Load similarity matrix
print("\n" + "=" * 60)
print("STEP 4: Loading paraphrase similarity...")
print("=" * 60)

sim_df = pd.read_csv("AI_Bullshit_Detector/data/llm_output_cosine_similarity.csv", index_col=0)
sim_matrix = sim_df.values
paraphrase_stats = compute_paraphrase_stats(sim_matrix, len(questions), group_size=5)

df['paraphrase_mean_sim'] = [s['mean_sim'] for s in paraphrase_stats]
df['paraphrase_std_sim'] = [s['std_sim'] for s in paraphrase_stats]

# Step 5: Combined score
print("\n" + "=" * 60)
print("STEP 5: Computing combined score...")
print("=" * 60)

scaler = MinMaxScaler()
oos_norm = scaler.fit_transform(df[['oos_score']].fillna(0))
head_norm = scaler.fit_transform(df[['head_disagree_12']].fillna(0))
sim_inverted = 1 - df['paraphrase_mean_sim'].fillna(1).values.reshape(-1, 1)
sim_norm = scaler.fit_transform(sim_inverted)

df['combined_score'] = 0.4 * oos_norm.flatten() + 0.3 * head_norm.flatten() + 0.3 * sim_norm.flatten()

print(f"\n✅ Done! Combined score range: [{df['combined_score'].min():.3f}, {df['combined_score'].max():.3f}]")


STEP 1: Extracting internal features...


Processing:  18%|█▊        | 18/100 [00:52<04:02,  2.96s/it]

In [None]:
fig, axes = plt.subplots(2, 3, figsize=(16, 11))

# 1. OOS vs Paraphrase
ax = axes[0, 0]
scatter = ax.scatter(df['oos_score'], df['paraphrase_mean_sim'], alpha=0.6, c=df['mean_entropy'], cmap='viridis', s=50)
plt.colorbar(scatter, ax=ax, label='Mean Entropy')
ax.set_xlabel('OOS Score (z8→z12 error)')
ax.set_ylabel('Paraphrase Mean Similarity')
ax.set_title('OOS Score vs Paraphrase Consistency')
ax.grid(True, alpha=0.3)

# 2. Head Disagree vs Paraphrase Spread
ax = axes[0, 1]
ax.scatter(df['head_disagree_12'], df['paraphrase_std_sim'], alpha=0.6, s=50)
ax.set_xlabel('Head Disagreement (Layer 12)')
ax.set_ylabel('Paraphrase Sim Spread (std)')
ax.set_title('Head Disagreement vs Instability')
ax.grid(True, alpha=0.3)

# 3. Correlation heatmap
ax = axes[0, 2]
score_cols = ['oos_score', 'head_disagree_8', 'head_disagree_12', 'mean_entropy', 'mean_prob', 'paraphrase_mean_sim', 'paraphrase_std_sim']
corr = df[score_cols].corr()
sns.heatmap(corr, annot=True, fmt='.2f', ax=ax, cmap='RdBu_r', center=0, square=True)
ax.set_title('Score Correlations')

# 4. Layer 8 vs 12
ax = axes[1, 0]
ax.scatter(df['head_disagree_8'], df['head_disagree_12'], alpha=0.6, s=50)
ax.plot([df['head_disagree_8'].min(), df['head_disagree_8'].max()],
        [df['head_disagree_8'].min(), df['head_disagree_8'].max()], 'r--', label='y=x')
ax.set_xlabel('Head Disagree (L8)')
ax.set_ylabel('Head Disagree (L12)')
ax.set_title('Layer 8 vs 12 Disagreement')
ax.legend()
ax.grid(True, alpha=0.3)

# 5. Combined score distribution
ax = axes[1, 1]
ax.hist(df['combined_score'], bins=25, edgecolor='black', alpha=0.7, color='steelblue')
ax.axvline(df['combined_score'].quantile(0.9), color='red', linestyle='--', lw=2, label=f'90th: {df["combined_score"].quantile(0.9):.3f}')
ax.axvline(df['combined_score'].median(), color='orange', linestyle='-', lw=2, label=f'Median: {df["combined_score"].median():.3f}')
ax.set_xlabel('Combined Score')
ax.set_ylabel('Frequency')
ax.set_title('Combined Score Distribution')
ax.legend()

# 6. Top hallucinations
ax = axes[1, 2]
top10 = df.nlargest(10, 'combined_score')[['question', 'combined_score']]
ax.barh(range(10), top10['combined_score'], color='coral', edgecolor='black')
ax.set_yticks(range(10))
ax.set_yticklabels([q[:45] + '...' if len(q) > 45 else q for q in top10['question']], fontsize=8)
ax.set_xlabel('Combined Score')
ax.set_title('Top 10 Suspected Hallucinations')
ax.invert_yaxis()

plt.tight_layout()
plt.savefig('hallucination_analysis.png', dpi=150)
plt.show()

In [None]:
print("\n" + "=" * 60)
print("SCORE SUMMARY")
print("=" * 60)

metrics = {
    'OOS Score': 'oos_score',
    'Head Disagree (L8)': 'head_disagree_8',
    'Head Disagree (L12)': 'head_disagree_12',
    'Mean Entropy': 'mean_entropy',
    'Mean Prob': 'mean_prob',
    'Paraphrase Mean Sim': 'paraphrase_mean_sim',
    'Paraphrase Std Sim': 'paraphrase_std_sim',
    'Combined Score': 'combined_score',
}

print(f"{'Metric':<22} {'Mean':>10} {'Std':>10} {'Min':>10} {'Max':>10}")
print("-" * 62)
for name, col in metrics.items():
    print(f"{name:<22} {df[col].mean():>10.4f} {df[col].std():>10.4f} {df[col].min():>10.4f} {df[col].max():>10.4f}")

# Save results
df.to_csv('hallucination_scores.csv', index=False)
print("\n✅ Results saved to hallucination_scores.csv")

In [None]:
import os
import json
import numpy as np
from tqdm import tqdm
import matplotlib.pyplot as plt

import torch
from torch.nn import functional as F
from torch.utils.data import DataLoader

from transformers import AutoTokenizer, AutoModelForCausalLM
from datasets import load_dataset
from peft import LoraConfig, get_peft_model
from lion_pytorch import Lion

# Create template and tokenizer for LLM model to be able to chat with it

In [None]:
# Basic question-answer template
template_without_answer = "<|startoftext|><|im_start|>user\n{question}<|im_end|>\n<|im_start|>assistant\n"
template_with_answer = template_without_answer + "{answer}<|im_end|>\n"

# Let's try to put something into the template to see how it looks
print(template_with_answer.format(question="What is your name?", answer="My name is Lili!"))

<|startoftext|><|im_start|>user
What is your name?<|im_end|>
<|im_start|>assistant
My name is Lili!<|im_end|>



In [None]:
# Load the tokenizer for Liquid AI LFM2-1.2B
model_id = "LiquidAI/LFM2-1.2B"
tokenizer = AutoTokenizer.from_pretrained(model_id)

# How big is the tokenizer?
print(f"Vocab size: {len(tokenizer.get_vocab())}")

Vocab size: 64400


In [None]:
# Lets test out both steps:
text = "Here is some sample text!"
print(f"Original text: {text}")

# Tokenize the text
tokens = tokenizer.encode(text, return_tensors="pt")
print(f"Encoded tokens: {tokens}")

# Decode the tokens
decoded_text = tokenizer.decode(tokens[0], skip_special_tokens=True)
print(f"Decoded text: {decoded_text}")

Original text: Here is some sample text!
Encoded tokens: tensor([[   1, 9151,  856, 1429, 6643, 3304,  510]])
Decoded text: Here is some sample text!


In [None]:
prompt = template_without_answer.format(question="What is the capital of France? Use one word.")
print(prompt)

<|startoftext|><|im_start|>user
What is the capital of France? Use one word.<|im_end|>
<|im_start|>assistant



# Load the model -- note that this may take a few minutes
model = AutoModelForCausalLM.from_pretrained(model_id, device_map="auto")

In [None]:
# Load the tokenizer for Liquid AI LFM2-1.2B
model_id = "LiquidAI/LFM2-1.2B"
tokenizer = AutoTokenizer.from_pretrained(model_id)

# How big is the tokenizer?
print(f"Vocab size: {len(tokenizer.get_vocab())}")

Vocab size: 64400


In [None]:
model = AutoModelForCausalLM.from_pretrained(model_id, device_map="auto")

In [None]:
prompt = template_without_answer.format(question="What does MIT stand for?")
tokens = tokenizer.encode(prompt, return_tensors="pt").to(model.device)
output = model.generate(tokens, max_new_tokens=20)
print(tokenizer.decode(output[0]))

<|startoftext|><|startoftext|><|im_start|>user
What does MIT stand for?<|im_end|>
<|im_start|>assistant
MIT stands for Massachusetts Institute of Technology. It is a private, research-intensive university located in


# Understanding the internal representations of the model

In [None]:
# Inspect the model structure
print(model.config)
print(model)

# Check if the model has a specific cache class
from transformers import Cache
print(f"Cache class: {getattr(model.config, 'cache_implementation', 'default')}")

Lfm2Config {
  "architectures": [
    "Lfm2ForCausalLM"
  ],
  "block_auto_adjust_ff_dim": true,
  "block_dim": 2048,
  "block_ff_dim": 12288,
  "block_ffn_dim_multiplier": 1.0,
  "block_mlp_init_scale": 1.0,
  "block_multiple_of": 256,
  "block_norm_eps": 1e-05,
  "block_out_init_scale": 1.0,
  "block_use_swiglu": true,
  "block_use_xavier_init": true,
  "bos_token_id": 1,
  "conv_L_cache": 3,
  "conv_bias": false,
  "conv_dim": 2048,
  "conv_dim_out": 2048,
  "conv_use_xavier_init": true,
  "dtype": "float32",
  "eos_token_id": 7,
  "hidden_size": 2048,
  "initializer_range": 0.02,
  "intermediate_size": 12288,
  "layer_types": [
    "conv",
    "conv",
    "full_attention",
    "conv",
    "conv",
    "full_attention",
    "conv",
    "conv",
    "full_attention",
    "conv",
    "full_attention",
    "conv",
    "full_attention",
    "conv",
    "full_attention",
    "conv"
  ],
  "max_position_embeddings": 128000,
  "model_type": "lfm2",
  "norm_eps": 1e-05,
  "num_attention_heads

In [None]:
# Encode prompt
prompt = template_without_answer.format(question="What does MIT stand for?")
input_ids = tokenizer.encode(prompt, return_tensors="pt").to(model.device)

# Forward pass with cache
with torch.no_grad():
    outputs = model(
        input_ids,
        use_cache=True,
        return_dict=True,
        output_attentions=True,  # Optional: attention weights
        output_hidden_states=True  # Optional: hidden states
    )

# Access cache
past_key_values = outputs.past_key_values
logits = outputs.logits

In [None]:
# Inspect cache structure
print(f"Cache type: {type(past_key_values)}")
print(f"Number of layers in cache: {len(past_key_values)}")

# For each layer, check the structure
for i, layer_cache in enumerate(past_key_values):
    if layer_cache is not None:
        if isinstance(layer_cache, tuple):
            print(f"Layer {i}: K shape={layer_cache[0].shape}, V shape={layer_cache[1].shape}")
        else:
            print(f"Layer {i}: type={type(layer_cache)}")
    else:
        print(f"Layer {i}: None (likely conv layer)")

Cache type: <class 'transformers.models.lfm2.modeling_lfm2.Lfm2HybridConvCache'>
Number of layers in cache: 15
Layer 0: K shape=torch.Size([0]), V shape=torch.Size([0])
Layer 1: K shape=torch.Size([0]), V shape=torch.Size([0])
Layer 2: K shape=torch.Size([1, 8, 16, 64]), V shape=torch.Size([1, 8, 16, 64])
Layer 3: K shape=torch.Size([0]), V shape=torch.Size([0])
Layer 4: K shape=torch.Size([0]), V shape=torch.Size([0])
Layer 5: K shape=torch.Size([1, 8, 16, 64]), V shape=torch.Size([1, 8, 16, 64])
Layer 6: K shape=torch.Size([0]), V shape=torch.Size([0])
Layer 7: K shape=torch.Size([0]), V shape=torch.Size([0])
Layer 8: K shape=torch.Size([1, 8, 16, 64]), V shape=torch.Size([1, 8, 16, 64])
Layer 9: K shape=torch.Size([0]), V shape=torch.Size([0])
Layer 10: K shape=torch.Size([1, 8, 16, 64]), V shape=torch.Size([1, 8, 16, 64])
Layer 11: K shape=torch.Size([0]), V shape=torch.Size([0])
Layer 12: K shape=torch.Size([1, 8, 16, 64]), V shape=torch.Size([1, 8, 16, 64])
Layer 13: K shape=torc

In [None]:
def generate_with_cache_and_probs(model, tokenizer, prompt, max_new_tokens=20):
    input_ids = tokenizer.encode(prompt, return_tensors="pt").to(model.device)

    generated_tokens = []
    token_probs = []
    all_caches = []

    past_key_values = None
    current_ids = input_ids

    for step in range(max_new_tokens):
        with torch.no_grad():
            outputs = model(
                current_ids,
                past_key_values=past_key_values,
                use_cache=True,
                return_dict=True
            )

        # Get logits for last token
        next_token_logits = outputs.logits[:, -1, :]

        # Compute probabilities
        probs = F.softmax(next_token_logits, dim=-1)

        # Sample or greedy decode
        next_token = torch.argmax(next_token_logits, dim=-1, keepdim=True)

        # Get probability of selected token
        selected_prob = probs[0, next_token.item()].item()

        # Store results
        generated_tokens.append(next_token.item())
        token_probs.append(selected_prob)
        all_caches.append(outputs.past_key_values)

        # Check for EOS
        if next_token.item() == tokenizer.eos_token_id:
            break

        # Update for next iteration
        past_key_values = outputs.past_key_values
        current_ids = next_token

    return {
        "input_ids": input_ids,
        "generated_tokens": generated_tokens,
        "token_probs": token_probs,
        "final_cache": all_caches[-1],
        "text": tokenizer.decode(generated_tokens, skip_special_tokens=True)
    }

In [None]:
# Run generation
result = generate_with_cache_and_probs(
    model, tokenizer,
    template_without_answer.format(question="What does MIT stand for?"),
    max_new_tokens=20
)

# Display output
print(f"Generated text: {result['text']}")
print(f"\nToken-by-token breakdown:")
for i, (tok, prob) in enumerate(zip(result['generated_tokens'], result['token_probs'])):
    print(f"  Step {i}: '{tokenizer.decode([tok])}' (prob: {prob:.4f})")

# Analyze final KV cache
final_cache = result['final_cache']
print(f"\nFinal KV Cache Summary:")
for i, layer_cache in enumerate(final_cache):
    if layer_cache is not None and isinstance(layer_cache, tuple):
        k, v = layer_cache
        print(f"  Layer {i}: K={k.shape}, V={v.shape}, K_mem={k.element_size()*k.nelement()/1024:.1f}KB")

Generated text: MIT stands for Massachusetts Institute of Technology. It is a private, research-intensive university located in

Token-by-token breakdown:
  Step 0: 'M' (prob: 0.9520)
  Step 1: 'IT' (prob: 0.9993)
  Step 2: ' stands' (prob: 0.9243)
  Step 3: ' for' (prob: 0.9999)
  Step 4: ' Massachusetts' (prob: 0.8794)
  Step 5: ' Institute' (prob: 0.9990)
  Step 6: ' of' (prob: 0.9999)
  Step 7: ' Technology' (prob: 0.9993)
  Step 8: '.' (prob: 0.8819)
  Step 9: ' It' (prob: 0.8163)
  Step 10: ' is' (prob: 0.5875)
  Step 11: ' a' (prob: 0.8055)
  Step 12: ' private' (prob: 0.3536)
  Step 13: ',' (prob: 0.6832)
  Step 14: ' research' (prob: 0.7299)
  Step 15: '-int' (prob: 0.6954)
  Step 16: 'ensive' (prob: 0.9982)
  Step 17: ' university' (prob: 0.1610)
  Step 18: ' located' (prob: 0.9285)
  Step 19: ' in' (prob: 0.9984)

Final KV Cache Summary:
  Layer 0: K=torch.Size([0]), V=torch.Size([0]), K_mem=0.0KB
  Layer 1: K=torch.Size([0]), V=torch.Size([0]), K_mem=0.0KB
  Layer 2: K=torc

In [None]:
def analyze_kv_cache(cache):
    """Analyze KV cache for hybrid LFM2 model"""
    stats = {
        "total_layers": len(cache),
        "attention_layers": 0,
        "total_memory_bytes": 0,
        "layer_details": []
    }

    for i, layer_cache in enumerate(cache):
        if layer_cache is not None and isinstance(layer_cache, tuple):
            k, v = layer_cache
            layer_mem = (k.element_size() * k.nelement() +
                        v.element_size() * v.nelement())
            stats["attention_layers"] += 1
            stats["total_memory_bytes"] += layer_mem
            stats["layer_details"].append({
                "layer": i,
                "k_shape": list(k.shape),
                "v_shape": list(v.shape),
                "memory_kb": layer_mem / 1024
            })

    return stats

cache_stats = analyze_kv_cache(result['final_cache'])
print(json.dumps(cache_stats, indent=2))

{
  "total_layers": 15,
  "attention_layers": 15,
  "total_memory_bytes": 860160,
  "layer_details": [
    {
      "layer": 0,
      "k_shape": [
        0
      ],
      "v_shape": [
        0
      ],
      "memory_kb": 0.0
    },
    {
      "layer": 1,
      "k_shape": [
        0
      ],
      "v_shape": [
        0
      ],
      "memory_kb": 0.0
    },
    {
      "layer": 2,
      "k_shape": [
        1,
        8,
        35,
        64
      ],
      "v_shape": [
        1,
        8,
        35,
        64
      ],
      "memory_kb": 140.0
    },
    {
      "layer": 3,
      "k_shape": [
        0
      ],
      "v_shape": [
        0
      ],
      "memory_kb": 0.0
    },
    {
      "layer": 4,
      "k_shape": [
        0
      ],
      "v_shape": [
        0
      ],
      "memory_kb": 0.0
    },
    {
      "layer": 5,
      "k_shape": [
        1,
        8,
        35,
        64
      ],
      "v_shape": [
        1,
        8,
        35,
        64
      ],
     

# 4. Calculate the I Do Not Know Score

## Step 1

In [None]:
import torch
import torch.nn.functional as F
import numpy as np
from dataclasses import dataclass
from typing import List, Dict, Optional
import json

@dataclass
class HallucinationFeatures:
    """Features extracted for hallucination detection."""
    # Input
    prompt: str
    input_ids: torch.Tensor

    # Hidden states (per generated token)
    z8_states: List[torch.Tensor]   # Layer 8 hidden states
    z12_states: List[torch.Tensor]  # Layer 12 hidden states

    # KV cache stats (per generated token)
    kv8_head_disagreement: List[float]
    kv12_head_disagreement: List[float]

    # Output
    generated_tokens: List[int]
    token_probs: List[float]
    token_entropies: List[float]
    top5_tokens: List[List[tuple]]
    full_text: str

    # Summary scores
    mean_prob: float
    mean_entropy: float
    mean_head_disagreement_8: float
    mean_head_disagreement_12: float


def compute_head_disagreement(kv_cache_layer) -> float:
    """
    Compute disagreement across KV heads for a single layer.
    Higher = more disagreement = potential hallucination signal.

    kv_cache_layer: tuple of (K, V) each with shape [batch, 8_heads, seq, 64]
    """
    k, v = kv_cache_layer

    if k.numel() == 0:  # Conv layer, no KV cache
        return 0.0

    # Variance across heads (dim=1) for the LAST token position
    # Shape: [batch, heads, head_dim] -> variance over heads
    k_last = k[:, :, -1, :]  # [1, 8, 64]
    v_last = v[:, :, -1, :]

    k_var = k_last.var(dim=1).mean().item()  # Variance across 8 heads
    v_var = v_last.var(dim=1).mean().item()

    return k_var + v_var


def extract_hallucination_features(
    model,
    tokenizer,
    prompt: str,
    max_new_tokens: int = 50
) -> HallucinationFeatures:
    """
    Generate tokens while extracting all features needed for hallucination detection.
    """
    input_ids = tokenizer.encode(prompt, return_tensors="pt").to(model.device)

    # Storage
    z8_states = []
    z12_states = []
    kv8_disagreement = []
    kv12_disagreement = []
    generated_tokens = []
    token_probs = []
    token_entropies = []
    top5_tokens = []

    past_key_values = None
    current_ids = input_ids

    for step in range(max_new_tokens):
        with torch.no_grad():
            outputs = model(
                current_ids,
                past_key_values=past_key_values,
                use_cache=True,
                output_hidden_states=True,  # KEY: Get layer outputs
                return_dict=True
            )

        # === HIDDEN STATES (z8, z12) ===
        # hidden_states is tuple of (embedding, layer0, layer1, ..., layer15)
        # So layer 8 output is at index 9, layer 12 is at index 13
        hidden_states = outputs.hidden_states

        # Get last token's hidden state for the current step
        z8 = hidden_states[9][:, -1, :].clone().cpu()   # [1, 2048]
        z12 = hidden_states[13][:, -1, :].clone().cpu() # [1, 2048]

        z8_states.append(z8)
        z12_states.append(z12)

        # === KV CACHE HEAD DISAGREEMENT ===
        cache = outputs.past_key_values
        kv8_disagreement.append(compute_head_disagreement(cache[8]))
        kv12_disagreement.append(compute_head_disagreement(cache[12]))

        # === LOGITS & PROBABILITIES ===
        logits = outputs.logits[:, -1, :]  # [1, vocab_size]
        probs = F.softmax(logits, dim=-1)

        # Entropy of distribution
        entropy = -(probs * torch.log(probs + 1e-10)).sum().item()

        # Greedy selection
        next_token = torch.argmax(logits, dim=-1)
        selected_prob = probs[0, next_token.item()].item()

        # Top-5
        top5_probs, top5_idx = torch.topk(probs[0], 5)
        top5 = [(tokenizer.decode([idx.item()]), prob.item())
                for idx, prob in zip(top5_idx, top5_probs)]

        # Store
        generated_tokens.append(next_token.item())
        token_probs.append(selected_prob)
        token_entropies.append(entropy)
        top5_tokens.append(top5)

        # Check EOS
        if next_token.item() == tokenizer.eos_token_id:
            break

        # Update for next step
        past_key_values = outputs.past_key_values
        current_ids = next_token.unsqueeze(0)

    # === BUILD RESULT ===
    return HallucinationFeatures(
        prompt=prompt,
        input_ids=input_ids.cpu(),
        z8_states=z8_states,
        z12_states=z12_states,
        kv8_head_disagreement=kv8_disagreement,
        kv12_head_disagreement=kv12_disagreement,
        generated_tokens=generated_tokens,
        token_probs=token_probs,
        token_entropies=token_entropies,
        top5_tokens=top5_tokens,
        full_text=tokenizer.decode(generated_tokens, skip_special_tokens=True),
        mean_prob=np.mean(token_probs),
        mean_entropy=np.mean(token_entropies),
        mean_head_disagreement_8=np.mean(kv8_disagreement),
        mean_head_disagreement_12=np.mean(kv12_disagreement),
    )

# Step 2: Unsupervised z8 → z12 Predictor

In [None]:
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader

class Z8toZ12Predictor(nn.Module):
    """
    Predict z12 from z8. High prediction error = out-of-support.
    """
    def __init__(self, hidden_dim=2048, bottleneck_dim=512):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(hidden_dim, bottleneck_dim),
            nn.GELU(),
            nn.LayerNorm(bottleneck_dim),
            nn.Linear(bottleneck_dim, bottleneck_dim),
            nn.GELU(),
            nn.LayerNorm(bottleneck_dim),
            nn.Linear(bottleneck_dim, hidden_dim),
        )

    def forward(self, z8):
        return self.net(z8)

    def prediction_error(self, z8, z12):
        """MSE between predicted and actual z12."""
        z12_pred = self.forward(z8)
        return F.mse_loss(z12_pred, z12, reduction='none').mean(dim=-1)


class Z8Z12LogitsPredictor(nn.Module):
    """
    Extended predictor: z8 → z12 → logits_summary
    Captures full information flow consistency.
    """
    def __init__(self, hidden_dim=2048, vocab_size=65536, bottleneck_dim=512):
        super().__init__()

        # z8 → z12 predictor
        self.z8_to_z12 = nn.Sequential(
            nn.Linear(hidden_dim, bottleneck_dim),
            nn.GELU(),
            nn.Linear(bottleneck_dim, hidden_dim),
        )

        # z12 → logits summary (top-k probs, entropy)
        # We predict a summary, not full vocab logits
        self.z12_to_logits_summary = nn.Sequential(
            nn.Linear(hidden_dim, bottleneck_dim),
            nn.GELU(),
            nn.Linear(bottleneck_dim, 6),  # [entropy, top1_prob, top2_prob, ...]
        )

    def forward(self, z8):
        z12_pred = self.z8_to_z12(z8)
        logits_summary_pred = self.z12_to_logits_summary(z12_pred)
        return z12_pred, logits_summary_pred

    def compute_oos_score(self, z8, z12, logits_summary):
        """
        Out-of-support score combining both prediction errors.
        """
        z12_pred, logits_pred = self.forward(z8)

        err_z12 = F.mse_loss(z12_pred, z12, reduction='none').mean(dim=-1)
        err_logits = F.mse_loss(logits_pred, logits_summary, reduction='none').mean(dim=-1)

        # Weighted combination
        return 0.7 * err_z12 + 0.3 * err_logits


class HallucinationDataset(Dataset):
    """Dataset for training the predictor."""
    def __init__(self, features_list: List[HallucinationFeatures]):
        self.samples = []

        for feat in features_list:
            for i, (z8, z12, prob, entropy) in enumerate(zip(
                feat.z8_states,
                feat.z12_states,
                feat.token_probs,
                feat.token_entropies
            )):
                # Create logits summary: [entropy, top5_probs]
                top5_probs = [p for _, p in feat.top5_tokens[i]]
                logits_summary = torch.tensor([entropy] + top5_probs, dtype=torch.float32)

                self.samples.append({
                    'z8': z8.squeeze(0),
                    'z12': z12.squeeze(0),
                    'logits_summary': logits_summary,
                    'prob': prob,
                    'entropy': entropy,
                })

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        return self.samples[idx]


def train_predictor(features_list: List[HallucinationFeatures], epochs=10, lr=1e-4):
    """Train the z8 → z12 → logits predictor."""
    dataset = HallucinationDataset(features_list)
    loader = DataLoader(dataset, batch_size=32, shuffle=True)

    predictor = Z8Z12LogitsPredictor()
    optimizer = torch.optim.AdamW(predictor.parameters(), lr=lr)

    predictor.train()
    for epoch in range(epochs):
        total_loss = 0
        for batch in loader:
            z8 = batch['z8']
            z12 = batch['z12']
            logits_summary = batch['logits_summary']

            z12_pred, logits_pred = predictor(z8)

            loss_z12 = F.mse_loss(z12_pred, z12)
            loss_logits = F.mse_loss(logits_pred, logits_summary)
            loss = 0.7 * loss_z12 + 0.3 * loss_logits

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            total_loss += loss.item()

        print(f"Epoch {epoch+1}/{epochs} - Loss: {total_loss/len(loader):.6f}")

    return predictor

## Step 3: Head Disagreement Score (Refined)

In [None]:
def compute_sequence_head_disagreement(features: HallucinationFeatures) -> Dict:
    """
    Compute detailed head disagreement statistics.
    """
    kv8 = np.array(features.kv8_head_disagreement)
    kv12 = np.array(features.kv12_head_disagreement)

    return {
        # Layer 8
        "kv8_mean": kv8.mean(),
        "kv8_std": kv8.std(),
        "kv8_max": kv8.max(),
        "kv8_trend": np.polyfit(range(len(kv8)), kv8, 1)[0] if len(kv8) > 1 else 0,

        # Layer 12
        "kv12_mean": kv12.mean(),
        "kv12_std": kv12.std(),
        "kv12_max": kv12.max(),
        "kv12_trend": np.polyfit(range(len(kv12)), kv12, 1)[0] if len(kv12) > 1 else 0,

        # Cross-layer
        "kv_ratio": kv12.mean() / (kv8.mean() + 1e-8),
        "kv_correlation": np.corrcoef(kv8, kv12)[0, 1] if len(kv8) > 1 else 0,
    }

# Step 4: Integration with Paraphrase Dataset

In [None]:
import pandas as pd
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.preprocessing import normalize
from tqdm import tqdm

def run_full_hallucination_analysis(model, tokenizer, csv_path: str):
    """
    Run complete analysis pipeline on paraphrase dataset.
    """
    # Load data
    df = pd.read_csv(csv_path)

    # === STEP 1: Extract features for all questions ===
    print("Extracting internal features...")
    all_features = []
    for question in tqdm(df['question'], desc="Processing"):
        prompt = template_without_answer.format(question=question)
        features = extract_hallucination_features(model, tokenizer, prompt, max_new_tokens=100)
        all_features.append(features)

    df['llm_output'] = [f.full_text for f in all_features]
    df['mean_prob'] = [f.mean_prob for f in all_features]
    df['mean_entropy'] = [f.mean_entropy for f in all_features]
    df['head_disagree_8'] = [f.mean_head_disagreement_8 for f in all_features]
    df['head_disagree_12'] = [f.mean_head_disagreement_12 for f in all_features]

    # === STEP 2: Train z8 → z12 predictor ===
    print("\nTraining z8 → z12 predictor...")
    predictor = train_predictor(all_features, epochs=20)

    # === STEP 3: Compute OOS scores ===
    print("\nComputing out-of-support scores...")
    predictor.eval()
    oos_scores = []

    for feat in all_features:
        scores = []
        for z8, z12, prob, entropy in zip(
            feat.z8_states, feat.z12_states,
            feat.token_probs, feat.token_entropies
        ):
            top5_probs = [p for _, p in feat.top5_tokens[feat.z8_states.index(z8)]]
            logits_summary = torch.tensor([entropy] + top5_probs[:5])

            with torch.no_grad():
                score = predictor.compute_oos_score(
                    z8.squeeze(0).unsqueeze(0),
                    z12.squeeze(0).unsqueeze(0),
                    logits_summary.unsqueeze(0)
                )
            scores.append(score.item())

        oos_scores.append(np.mean(scores))

    df['oos_score'] = oos_scores

    # === STEP 4: Paraphrase consistency (TF-IDF) ===
    print("\nComputing paraphrase consistency...")
    vectorizer = TfidfVectorizer()
    embeddings = vectorizer.fit_transform(df['llm_output']).toarray()
    embeddings = normalize(embeddings)
    cos_sim_matrix = cosine_similarity(embeddings)

    # Group by category and compute consistency
    paraphrase_stats = []
    for idx, row in df.iterrows():
        category = row['category']
        same_category = df[df['category'] == category].index.tolist()

        if len(same_category) > 1:
            sims = [cos_sim_matrix[idx, j] for j in same_category if j != idx]
            paraphrase_stats.append({
                'mean_sim': np.mean(sims),
                'std_sim': np.std(sims),
                'min_sim': np.min(sims),
            })
        else:
            paraphrase_stats.append({'mean_sim': 1.0, 'std_sim': 0.0, 'min_sim': 1.0})

    df['paraphrase_mean_sim'] = [s['mean_sim'] for s in paraphrase_stats]
    df['paraphrase_std_sim'] = [s['std_sim'] for s in paraphrase_stats]
    df['paraphrase_min_sim'] = [s['min_sim'] for s in paraphrase_stats]

    return df, predictor, all_features


# === RUN ANALYSIS ===
df_results, predictor, all_features = run_full_hallucination_analysis(
    model, tokenizer,
    "AI_Bullshit_Detector/data/paraphase-prompts.csv"
)

Extracting internal features...


Processing: 100%|██████████| 100/100 [05:02<00:00,  3.03s/it]



Training z8 → z12 predictor...


RuntimeError: mat1 and mat2 must have the same dtype, but got Half and Float

## Step 5: Compare & Visualize Scores


In [None]:
import matplotlib.pyplot as plt
import seaborn as sns

def visualize_score_comparison(df):
    """Compare all hallucination detection strategies."""

    fig, axes = plt.subplots(2, 3, figsize=(15, 10))

    # 1. OOS Score vs Paraphrase Consistency
    ax = axes[0, 0]
    ax.scatter(df['oos_score'], df['paraphrase_mean_sim'], alpha=0.6, c=df['mean_entropy'], cmap='viridis')
    ax.set_xlabel('OOS Score (z8→z12 error)')
    ax.set_ylabel('Paraphrase Mean Similarity')
    ax.set_title('OOS Score vs Paraphrase Consistency')

    # 2. Head Disagreement vs Paraphrase Spread
    ax = axes[0, 1]
    ax.scatter(df['head_disagree_12'], df['paraphrase_std_sim'], alpha=0.6)
    ax.set_xlabel('Head Disagreement (Layer 12)')
    ax.set_ylabel('Paraphrase Similarity Spread')
    ax.set_title('Head Disagreement vs Paraphrase Instability')

    # 3. Correlation heatmap
    ax = axes[0, 2]
    score_cols = ['oos_score', 'head_disagree_8', 'head_disagree_12',
                  'mean_entropy', 'mean_prob', 'paraphrase_mean_sim', 'paraphrase_std_sim']
    corr = df[score_cols].corr()
    sns.heatmap(corr, annot=True, fmt='.2f', ax=ax, cmap='RdBu_r', center=0)
    ax.set_title('Score Correlations')

    # 4. Distribution of OOS scores by category
    ax = axes[1, 0]
    df.boxplot(column='oos_score', by='category', ax=ax, rot=45)
    ax.set_title('OOS Score Distribution by Category')

    # 5. Combined hallucination score
    # Normalize and combine
    from sklearn.preprocessing import MinMaxScaler
    scaler = MinMaxScaler()

    df['combined_score'] = (
        0.4 * scaler.fit_transform(df[['oos_score']]) +
        0.3 * scaler.fit_transform(df[['head_disagree_12']]) +
        0.3 * (1 - scaler.fit_transform(df[['paraphrase_mean_sim']]))
    ).flatten()

    ax = axes[1, 1]
    ax.hist(df['combined_score'], bins=30, edgecolor='black', alpha=0.7)
    ax.set_xlabel('Combined Hallucination Score')
    ax.set_ylabel('Frequency')
    ax.set_title('Distribution of Combined Hallucination Score')
    ax.axvline(df['combined_score'].quantile(0.9), color='red', linestyle='--', label='90th percentile')
    ax.legend()

    # 6. Top suspected hallucinations
    ax = axes[1, 2]
    top_hallucinations = df.nlargest(10, 'combined_score')[['question', 'combined_score']]
    ax.barh(range(10), top_hallucinations['combined_score'])
    ax.set_yticks(range(10))
    ax.set_yticklabels([q[:40] + '...' for q in top_hallucinations['question']], fontsize=8)
    ax.set_xlabel('Combined Score')
    ax.set_title('Top 10 Suspected Hallucinations')

    plt.tight_layout()
    plt.savefig('hallucination_analysis.png', dpi=150)
    plt.show()

    return df

df_final = visualize_score_comparison(df_results)
df_final.to_csv('hallucination_scores.csv', index=False)

## Cosine similary of paraphrased sentences --> used in testing the Iinternal representation model (IRM) and output.

In [None]:
import pandas as pd
import numpy as np
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.preprocessing import normalize
from sklearn.metrics.pairwise import cosine_similarity
from tqdm import tqdm
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM
# --- 1. Load CSV ---
#!rm -rf AI_Bullshit_Detector
# Clone the repo
!git clone https://github.com/AvdMei/AI_Bullshit_Detector
df = pd.read_csv("AI_Bullshit_Detector/data/paraphase-prompts.csv")
print(df.head())
assert "category" in df.columns and "question" in df.columns
# --- 2. Load Liquid AI model and tokenizer ---
model_id = "LiquidAI/LFM2-1.2B"
tokenizer = AutoTokenizer.from_pretrained(model_id)
model = AutoModelForCausalLM.from_pretrained(model_id, torch_dtype=torch.float16)
model.eval()
device = "cuda" if torch.cuda.is_available() else "cpu"
model.to(device)
# --- 3. Templates ---
template_without_answer = "<|startoftext|><|im_start|>user\n{question}<|im_end|>\n<|im_start|>assistant\n"
# --- 4. Function to generate response ---
def generate_llm_output(question, max_tokens=200):
    prompt = template_without_answer.format(question=question)
    inputs = tokenizer(prompt, return_tensors="pt").to(device)
    with torch.no_grad():
        output_ids = model.generate(
            **inputs,
            max_new_tokens=max_tokens,
            do_sample=False  # deterministic for cosine similarity
        )
    output_text = tokenizer.decode(output_ids[0], skip_special_tokens=True)
    # Remove the prompt part to get only the assistant answer
    if "<|im_start|>assistant" in output_text:
        answer = output_text.split("<|im_start|>assistant")[1].strip()
    else:
        answer = output_text
    return answer
# --- 5. Generate LLM outputs ---
llm_outputs = []
for q in tqdm(df['question'], desc="Generating outputs"):
    try:
        out = generate_llm_output(q)
    except Exception as e:
        out = f"ERROR: {e}"
    llm_outputs.append(out)
df['llm_output'] = llm_outputs
# --- 6. Convert outputs to embeddings using TF-IDF ---
vectorizer = TfidfVectorizer()
embeddings = vectorizer.fit_transform(df['llm_output']).toarray()
embeddings = normalize(embeddings)
# --- 7. Compute cosine similarity ---
cos_sim_matrix = cosine_similarity(embeddings)
# --- 8. Save similarity matrix to CSV ---
sim_df = pd.DataFrame(cos_sim_matrix, index=df['question'], columns=df['question'])
sim_df.to_csv("AI_Bullshit_Detector/data/llm_output_cosine_similarity.csv")
print("Cosine similarity matrix saved to data/llm_output_cosine_similarity.csv")
# --- 9. Save outputs alongside prompts ---
df.to_csv("AI_Bullshit_Detector/data/paraphases_with_llm_output.csv", index=False)
print("LLM outputs saved to data/paraphases_with_llm_output.csv")

Cloning into 'AI_Bullshit_Detector'...
remote: Enumerating objects: 139, done.[K
remote: Counting objects: 100% (139/139), done.[K
remote: Compressing objects: 100% (109/109), done.[K
remote: Total 139 (delta 46), reused 92 (delta 16), pack-reused 0 (from 0)[K
Receiving objects: 100% (139/139), 457.96 KiB | 7.27 MiB/s, done.
Resolving deltas: 100% (46/46), done.
                           category  \
0  Software Engineering - Debugging   
1  Software Engineering - Debugging   
2  Software Engineering - Debugging   
3  Software Engineering - Debugging   
4  Software Engineering - Debugging   

                                            question  
0  What is the first step an experienced engineer...  
1  When a bug is reported, what should be done fi...  
2  What is the initial action a senior engineer t...  
3  Before changing code to fix a bug, what should...  
4  What is the most important first step in a pro...  


`torch_dtype` is deprecated! Use `dtype` instead!
Generating outputs: 100%|██████████| 100/100 [07:49<00:00,  4.70s/it]

Cosine similarity matrix saved to data/llm_output_cosine_similarity.csv
LLM outputs saved to data/paraphases_with_llm_output.csv





Liquid AI's LFM2-1.2B