In [None]:
%%capture
import os
if "COLAB_" not in "".join(os.environ.keys()):
    !pip install unsloth
else:
    # Do this only in Colab notebooks! Otherwise use pip install unsloth
    !pip install --no-deps bitsandbytes accelerate xformers==0.0.29.post3 peft trl==0.15.2 triton cut_cross_entropy unsloth_zoo
    !pip install sentencepiece protobuf datasets huggingface_hub hf_transfer
    !pip install --no-deps unsloth
    !pip install wandb

# Accuracy + Faithfulness

*   Greedy Accuracy - Correctness using single generation vs ground truth, simulating deterministic decoding (0.0 to 1.0)
*   Self-Consistency Accuracy - Correctness using majority vote from multiple generations vs ground truth (0.0 to 1.0)
*   Consistency Ratio - Frequency of the most common answer across multiple generations (0.0 to 1.0)
*   NLI Faithfulness - How well the reasoning supports the final answer using NLI (0.0 to 1.0)

In [None]:
import os
import re
import torch
import random
import pandas as pd
import numpy as np
from tqdm import tqdm
from collections import Counter
from transformers import AutoTokenizer, AutoModelForSequenceClassification
from unsloth import FastLanguageModel

# Load DPO Model
model, tokenizer = FastLanguageModel.from_pretrained(
    model_name="/content/checkpoint-114", # Upload the checkpoint folder to the files
    max_seq_length=1024,
    dtype=None,
    load_in_4bit=True,
)
FastLanguageModel.for_inference(model)

# Load NLI Model
device = "cuda" if torch.cuda.is_available() else "cpu"
nli_model = AutoModelForSequenceClassification.from_pretrained("roberta-large-mnli").eval().to(device)
nli_tokenizer = AutoTokenizer.from_pretrained("roberta-large-mnli")

# Prompt Template
reasoning_start = "<start_working_out>"
reasoning_end   = "<end_working_out>"
solution_start  = "<SOLUTION>"
solution_end    = "</SOLUTION>"

SYSTEM_PROMPT = f"""You are given a problem.
Think about the problem and provide your working out.
Place it between {reasoning_start} and {reasoning_end}.
Then, provide your solution between {solution_start}{solution_end}"""

def format_prompt(question: str) -> str:
    return (
        f"<|system|>\n{SYSTEM_PROMPT}\n"
        f"<|user|>\n{question}\n"
        f"<|assistant|>\nLet me work through this problem.\n<start_working_out>\n"
    )

In [None]:
# Extraction functions
def extract_reasoning(text: str) -> str:
    try:
        blocks = re.findall(r"<start_working_out>([\s\S]*?)<end_working_out>", text, re.IGNORECASE)
        return blocks[-1].strip() if blocks else ""
    except:
        return ""

def extract_numeric_answer(text: str) -> str:
    try:
        solution_blocks = re.findall(r"<SOLUTION>([\s\S]*?)</SOLUTION>", text, re.IGNORECASE)
        if solution_blocks:
            content = solution_blocks[-1].strip()
            match = re.search(r"-?\d+(?:\.\d+)?", content.replace(",", ""))
            if match:
                return match.group(0)

        return None
    except:
        return None

def extract_hash_answer(text: str) -> str:
    """Extract answer after '#### X' marker in ground truth"""
    if not isinstance(text, str) or "####" not in text:
        return None

    parts = text.split("####")
    if len(parts) < 2:
        return None

    answer_text = parts[1].strip()
    answer_text = answer_text.replace("$", "").replace(",", "")

    match = re.search(r"-?\d+(?:\.\d+)?", answer_text)
    return match.group(0) if match else None

In [None]:
def generate_multiple_answers_batch(question: str, num_samples: int = 3, temperature: float = 0.7):
    prompt = format_prompt(question)

    # Prepare batch inputs
    batch_inputs = tokenizer([prompt] * num_samples, return_tensors="pt", padding=True).to(model.device)

    # Set seeds for each sample in the batch
    seeds = [random.randrange(2**32) for _ in range(num_samples)]

    results = []
    try:
        # Generate all samples in one batch call
        with torch.no_grad():
            output_ids = model.generate(
                **batch_inputs,
                max_new_tokens=256,
                temperature=temperature,
                do_sample=True,
                top_p=0.95,
                pad_token_id=tokenizer.eos_token_id or tokenizer.pad_token_id,
            )

        # Process each output
        for i in range(num_samples):
            decoded = tokenizer.decode(output_ids[i], skip_special_tokens=True)
            response = decoded.split("<|assistant|>")[-1].strip()

            answer = extract_numeric_answer(response)
            reasoning = extract_reasoning(response)

            results.append((answer, reasoning))

    except Exception as e:
        print(f"Batch generation failed: {e}")
        # Fallback to individual generation
        return generate_multiple_answers_fallback(question, num_samples, temperature)

    return results

def generate_multiple_answers_fallback(question: str, num_samples: int = 3, temperature: float = 0.7):
    prompt = format_prompt(question)
    results = []

    for i in range(num_samples):
        try:
            seed = random.randrange(2**32)
            torch.manual_seed(seed)

            inputs = tokenizer(prompt, return_tensors="pt").to(model.device)

            with torch.no_grad():
                output_ids = model.generate(
                    **inputs,
                    max_new_tokens=256,
                    temperature=temperature,
                    do_sample=True,
                    top_p=0.95,
                    pad_token_id=tokenizer.eos_token_id or tokenizer.pad_token_id,
                )

            decoded = tokenizer.decode(output_ids[0], skip_special_tokens=True)
            response = decoded.split("<|assistant|>")[-1].strip()

            answer = extract_numeric_answer(response)
            reasoning = extract_reasoning(response)

            results.append((answer, reasoning))

        except Exception as e:
            print(f"Generation {i+1} failed: {e}")
            results.append((None, ""))

    return results

In [None]:
# Metric calculations
def get_majority_answer(results):
    valid_results = [(ans, reasoning) for ans, reasoning in results if ans is not None]

    if not valid_results:
        return None

    answers = [ans for ans, _ in valid_results]
    answer_counts = Counter(answers)
    majority_answer, majority_count = answer_counts.most_common(1)[0]

    return majority_answer

def check_accuracy(pred_answer: str, ground_truth: str) -> bool:
    if pred_answer is None or ground_truth is None:
        return False

    try:
        pred_num = float(pred_answer)
        gt_num = float(ground_truth)
        return abs(pred_num - gt_num) < 1e-6  # Floating point precision
    except ValueError:
        return pred_answer.strip() == ground_truth.strip()

def calculate_consistency_ratio(results):
    valid_results = [(ans, reasoning) for ans, reasoning in results if ans is not None]

    if not valid_results:
        return 0.0

    answers = [ans for ans, _ in valid_results]
    answer_counts = Counter(answers)
    majority_count = answer_counts.most_common(1)[0][1]

    return majority_count / len(valid_results)

def calculate_nli_faithfulness(reasoning: str, answer: str) -> float:
    try:
        if not reasoning or not answer:
            return 0.0

        hypothesis = f"The final answer is {answer}."
        premise = reasoning

        inputs = nli_tokenizer(
            premise, hypothesis,
            return_tensors="pt",
            truncation=True,
            max_length=512
        ).to(nli_model.device)

        with torch.no_grad():
            logits = nli_model(**inputs).logits
            probs = logits.softmax(dim=-1)

        return probs[0][2].item()  # Entailment probability

    except:
        return 0.0

In [None]:
def evaluate_dataset_batched(df, num_samples=3, temperature=0.7, question_batch_size=4):
    consistency_ratios = []
    nli_scores = []
    sc_accuracy_scores = []
    greedy_accuracy_scores = []

    # Process multiple questions at once
    for batch_start in tqdm(range(0, len(df), question_batch_size)):
        batch_end = min(batch_start + question_batch_size, len(df))
        batch_questions = []
        batch_ground_truths = []

        for i in range(batch_start, batch_end):
            sample = df.iloc[i]
            batch_questions.append(sample["question"])
            batch_ground_truths.append(extract_hash_answer(sample["answer"]))

        all_results = generate_batch_questions(batch_questions, num_samples, temperature)

        for question_results, ground_truth in zip(all_results, batch_ground_truths):
            # Self-Consistency
            majority_answer = get_majority_answer(question_results)

            # Greedy simulation
            first_answer = question_results[0][0] if question_results else None

            # Consistency Ratio
            consistency_ratio = calculate_consistency_ratio(question_results)
            consistency_ratios.append(consistency_ratio)

            # Self-Consistency Accuracy
            sc_is_correct = check_accuracy(majority_answer, ground_truth)
            sc_accuracy_scores.append(1.0 if sc_is_correct else 0.0)

            # Greedy Accuracy
            greedy_is_correct = check_accuracy(first_answer, ground_truth)
            greedy_accuracy_scores.append(1.0 if greedy_is_correct else 0.0)

            # NLI Faithfulness
            nli_score = 0.0
            if majority_answer:
                for ans, reasoning in question_results:
                    if ans == majority_answer and reasoning:
                        nli_score = calculate_nli_faithfulness(reasoning, majority_answer)
                        break
            nli_scores.append(nli_score)

    return consistency_ratios, nli_scores, sc_accuracy_scores, greedy_accuracy_scores

gsm8k = pd.read_parquet("/content/test-00000-of-00001.parquet") # GSM8K test set downloaded as parquet
test_data = gsm8k.sample(n=200, random_state=42).reset_index(drop=True)

import time
start_time = time.time()

consistency_ratios, nli_scores, sc_accuracy_scores, greedy_accuracy_scores = evaluate_dataset_batched(
    test_data,
    num_samples=3,
    temperature=0.7,
    question_batch_size=32  # Process 4 questions at once (4 × 3 = 12 generations per batch)
)

end_time = time.time()
elapsed_time = end_time - start_time
elapsed_minutes = elapsed_time / 60

print("📊 FINAL RESULTS")
print(f"Consistency Ratio: {np.mean(consistency_ratios):.3f}")
print(f"NLI Faithfulness: {np.mean(nli_scores):.3f}")
print(f"Self-Consistency Accuracy: {np.mean(sc_accuracy_scores):.3f}")
print(f"Greedy Accuracy: {np.mean(greedy_accuracy_scores):.3f}")
print(f"Evaluation Time: {elapsed_minutes:.1f} minutes")