In [None]:
import numpy as np
import re
import json
import requests


from transformers import pipeline
import json
import requests
from typing import List, Dict, Tuple, Optional
import logging

# Configure logging
logging.basicConfig(
    level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)


class BSDetector:
    """
    Implementation of BSDetector algorithm using a zero-shot classification pipeline
    """

    def __init__(
        self,
        llm_name="qwen2.5:0.5b",
        use_nli=True,
        nli_model_name="cross-encoder/nli-deberta-v3-small",
        alpha=0.7,
        beta=0.5,
        num_samples=5,
        temperature=1.0,
    ):
        """
        Initialize BSDetector with specified models and parameters.

        Args:
            llm_name: Ollama model name to use as primary LLM
            use_nli: Whether to try using a real NLI model
            nli_model_name: HuggingFace model for NLI
            alpha: Weight for NLI similarity vs exact match (paper's α)
            beta: Weight for observed consistency vs self-reflection (paper's β)
            num_samples: Number of samples for observed consistency (paper's k)
            temperature: Temperature for sampling in observed consistency
        """
        self.llm_name = llm_name
        self.api_base = "http://localhost:11434/api"

        # Parameters
        self.alpha = alpha
        self.beta = beta
        self.num_samples = num_samples
        self.temperature = temperature

        logger.info(f"Loading NLI pipeline with model: {nli_model_name}")
        self.nli_pipeline = pipeline("zero-shot-classification", model=nli_model_name)
        logger.info("NLI pipeline loaded successfully")

    def _generate_response(self, prompt, temperature=0.0):
        try:
            r = requests.post(
                f"{self.api_base}/generate",
                json={
                    "model": self.llm_name,
                    "prompt": prompt,
                    "temperature": temperature,
                    "stream": False,
                },
                headers={"Content-Type": "application/json"},
            )
            r.raise_for_status()

            text = r.text.splitlines()[0].strip()
            try:
                data = json.loads(text)
            except json.JSONDecodeError:
                try:
                    data = r.json()
                except Exception:
                    m = re.search(r'"response":"([^"]+)"', r.text)
                    return m.group(1) if m else ""

            return data.get("response", "")

        except Exception:
            logger.exception("Error calling Ollama API")
            raise

    def _nli_contradiction_score(self, reference: str, generated: str) -> float:
        """
        Use NLI pipeline to calculate contradiction probability between two texts.

        Returns:
            1 - contradiction_probability (higher means less similar)
        """

        # Define contradiction and entailment as classes
        labels = ["contradiction", "entailment"]

        # Check both directions to mitigate positional bias
        # Direction 1: reference -> generated
        result1 = self.nli_pipeline(
            reference, candidate_labels=labels, hypothesis=generated
        )

        # Direction 2: generated -> reference
        result2 = self.nli_pipeline(
            generated, candidate_labels=labels, hypothesis=reference
        )

        # Extract contradiction probabilities
        contradiction_idx1 = result1["labels"].index("contradiction")
        contradiction_prob1 = result1["scores"][contradiction_idx1]

        contradiction_idx2 = result2["labels"].index("contradiction")
        contradiction_prob2 = result2["scores"][contradiction_idx2]

        # Average the contradiction probabilities from both directions
        avg_contradiction_prob = (contradiction_prob1 + contradiction_prob2) / 2

        # Return 1 - contradiction for similarity score (higher means more similar)
        return 1.0 - avg_contradiction_prob

    def _exact_match(self, text1: str, text2: str) -> float:
        """Calculate exact match (indicator function r_i in paper)."""
        return 1.0 if text1.strip() == text2.strip() else 0.0

    def _compute_similarity(self, reference: str, generated: str) -> float:
        """
        Compute similarity combining NLI/fallback and exact match.

        Formula from paper: o_i = α*s_i + (1-α)*r_i
        """
        sim = self._nli_contradiction_score(reference, generated)
        exact_match = self._exact_match(reference, generated)

        # Combine with alpha weight (o_i in paper)
        # o_i = α*s_i + (1-α)*r_i
        similarity = self.alpha * sim + (1 - self.alpha) * exact_match

        return similarity

    def observed_consistency(self, question: str, reference_answer: str) -> float:
        """
        Measure observed consistency by generating multiple answers.

        Paper formula: O = (1/k) * Σo_i
        """
        # Template for Chain-of-Thought prompting
        cot_template = f"""Please strictly use the following template to provide answer:
explanation: [insert step-by-step analysis], answer: [provide your answer]

Question: {question}"""

        similarities = []
        for i in range(self.num_samples):
            try:
                # Generate response with temperature sampling
                response = self._generate_response(
                    cot_template, temperature=self.temperature
                )
                print(f"answer {i}: {response}")

                # Compute similarity (o_i in paper)
                similarity = self._compute_similarity(reference_answer, response)
                similarities.append(similarity)

            except Exception as e:
                logger.error(f"Error in observed consistency: {e}")
                # In case of failure, add a neutral value that won't skew the results
                similarities.append(0.5)

        # Return average similarity as observed consistency (O in paper)
        # O = (1/k) * Σo_i
        return np.mean(similarities) if similarities else 0.5

    def self_reflection_certainty(self, question: str, reference_answer: str) -> float:
        """
        Measure self-reflection certainty by asking the model to evaluate its answer.

        Paper formula: S = (score_1 + score_2 + ... + score_n) / n
        """
        # First reflection prompt
        reflection_prompt1 = f"""Question: {question}, Proposed Answer: {reference_answer}
Is the proposed answer: (A) Correct (B) Incorrect (C) I am not sure.
The output should strictly use the following template:
explanation: [insert analysis], answer: [choose one letter from among choices A through C]"""

        # Second reflection prompt
        reflection_prompt2 = f"""Question: {question}, Proposed Answer: {reference_answer}
Are you really sure the proposed answer is correct?
Choose again: (A) Correct (B) Incorrect (C) I am not sure.
The output should strictly use the following template:
explanation: [insert analysis], answer: [choose one letter from among choices A through C]"""

        try:
            # Get responses
            reflection1 = self._generate_response(reflection_prompt1)
            reflection2 = self._generate_response(reflection_prompt2)

            # Extract answers (simplified parsing)
            def extract_letter(reflection):
                if "answer:" in reflection.lower():
                    letter = reflection.lower().split("answer:")[1].strip()[0]
                    if letter in "abc":
                        return letter
                # Fallback to searching for the letter
                for letter in "abc":
                    if f"({letter.upper()})" in reflection:
                        return letter
                return "c"  # Default to "not sure" if parsing fails

            letter1 = extract_letter(reflection1)
            letter2 = extract_letter(reflection2)

            # Convert to numerical values: A=1.0, B=0.0, C=0.5
            values = {"a": 1.0, "b": 0.0, "c": 0.5}
            score1 = values.get(letter1, 0.5)
            score2 = values.get(letter2, 0.5)

            # Return average score (S in paper)
            # S = (score_1 + score_2) / 2
            return (score1 + score2) / 2

        except Exception as e:
            logger.error(f"Error in self-reflection: {e}")
            return 0.5  # Neutral value in case of failure

    def compute_confidence(
        self, question: str, answer: Optional[str] = None
    ) -> Tuple[float, str]:
        """
        Compute overall confidence score for an answer.

        Paper formula: C = β*O + (1-β)*S
        """
        # If no answer provided, generate one with temperature=0
        if answer is None:
            answer = self._generate_response(question)
            logger.info(f"Generated answer: {answer}")

        # Compute the two components
        consistency = self.observed_consistency(question, answer)
        reflection = self.self_reflection_certainty(question, answer)

        logger.info(f"Observed consistency: {consistency:.3f}")
        logger.info(f"Self-reflection certainty: {reflection:.3f}")

        # Combine with beta weighting (C in paper)
        # C = β*O + (1-β)*S
        confidence = self.beta * consistency + (1 - self.beta) * reflection

        return confidence, answer

    def select_best_answer(
        self, question: str, num_candidates: int = 3
    ) -> Tuple[str, float]:
        """
        Generate multiple candidate answers and select the one with highest confidence.
        """
        candidates = []

        # Generate multiple candidate answers with temperature sampling
        logger.info(f"Generating {num_candidates} candidate answers...")
        for i in range(num_candidates):
            try:
                candidate = self._generate_response(
                    question, temperature=self.temperature
                )
                logger.info(f"Candidate {i + 1}: {candidate[:100]}...")

                confidence, _ = self.compute_confidence(question, candidate)
                candidates.append((candidate, confidence))

                logger.info(f"Confidence for candidate {i + 1}: {confidence:.3f}")

            except Exception as e:
                logger.error(f"Error generating candidate {i + 1}: {e}")

        if not candidates:
            # If all candidates failed, return a default response
            default_answer = "I'm unable to provide a confident answer at this time."
            return default_answer, 0.0

        # Select the answer with highest confidence
        best_candidate = max(candidates, key=lambda x: x[1])

        logger.info(f"Selected best answer with confidence: {best_candidate[1]:.3f}")
        return best_candidate


if __name__ == "__main__":
    print("Initializing BSDetector with zero-shot classification pipeline...")

    # Initialize with zero-shot classification pipeline
    detector = BSDetector(
        llm_name="qwen2.5:0.5b",
        use_nli=True,
        alpha=0.7,
        beta=0.5,
        num_samples=5,
        temperature=0.88888888,
    )

    # Example 1: Correct answer
    question1 = "What is the capital of France?"
    answer1 = "Paris"

    print(f"\nQuestion: {question1}")
    print(f"Answer: {answer1}")
    confidence1, _ = detector.compute_confidence(question1, answer1)
    print(f"Confidence: {confidence1:.3f}")

    # Example 2: Wrong answer
    question2 = "What is the capital of France?"
    answer2 = "London"

    print(f"\nQuestion: {question2}")
    print(f"Answer: {answer2}")
    confidence2, _ = detector.compute_confidence(question2, answer2)
    print(f"Confidence: {confidence2:.3f}")

    # Example 3: Select best answer from multiple candidates
    question3 = "What is the distance from Earth to the Moon in kilometers?"

    print(f"\nQuestion: {question3}")
    best_answer, confidence3 = detector.select_best_answer(question3, num_candidates=2)
    print(f"Best answer: {best_answer}")
    print(f"Confidence: {confidence3:.3f}")


2025-05-16 07:30:44,603 - INFO - Loading NLI pipeline with model: cross-encoder/nli-deberta-v3-small


Initializing BSDetector with zero-shot classification pipeline...


Device set to use mps:0
2025-05-16 07:30:45,631 - INFO - NLI pipeline loaded successfully



Question: What is the capital of France?
Answer: Paris


2025-05-16 07:30:57,913 - INFO - Observed consistency: 0.624
2025-05-16 07:30:57,914 - INFO - Self-reflection certainty: 0.500


Confidence: 0.562

Question: What is the capital of France?
Answer: London


2025-05-16 07:31:04,799 - INFO - Observed consistency: 0.581
2025-05-16 07:31:04,800 - INFO - Self-reflection certainty: 0.750
2025-05-16 07:31:04,801 - INFO - Generating 2 candidate answers...


Confidence: 0.665

Question: What is the distance from Earth to the Moon in kilometers?


2025-05-16 07:31:05,107 - INFO - Candidate 1: The distance from Earth to the Moon (the lunar orbit) is approximately 384,400 kilometers or about 2...
2025-05-16 07:31:12,230 - INFO - Observed consistency: 0.589
2025-05-16 07:31:12,232 - INFO - Self-reflection certainty: 0.000
2025-05-16 07:31:12,233 - INFO - Confidence for candidate 1: 0.295
2025-05-16 07:31:12,659 - INFO - Candidate 2: The distance from Earth to the Moon is approximately 384,400 kilometers.

This value can vary slight...
2025-05-16 07:31:19,221 - INFO - Observed consistency: 0.644
2025-05-16 07:31:19,223 - INFO - Self-reflection certainty: 0.750
2025-05-16 07:31:19,223 - INFO - Confidence for candidate 2: 0.697
2025-05-16 07:31:19,224 - INFO - Selected best answer with confidence: 0.697


Best answer: The distance from Earth to the Moon is approximately 384,400 kilometers.

This value can vary slightly depending on whether we're talking about the actual distance between the two planets or if it's a hypothetical calculation based on current astronomical knowledge.
Confidence: 0.697


In [None]:
# TODO NLI implementation wrong, we don't want to compare the prompt to the answer, rather compare differnt prompts (refer to paper)
# TODO next action, print candidates of COT, print results of each self reflection, print results of NLI
# TODO second action: bettere understand confidence score calculation