# Diversity Testbench

---
---

## Ember Package Testing (WIP)

### Setup Dependencies

In [18]:
import openai

In [28]:
openai.models.list().model_dump()

{'data': [{'id': 'gpt-4.5-preview',
   'created': 1740623059,
   'object': 'model',
   'owned_by': 'system'},
  {'id': 'gpt-4.5-preview-2025-02-27',
   'created': 1740623304,
   'object': 'model',
   'owned_by': 'system'},
  {'id': 'gpt-4o-mini-2024-07-18',
   'created': 1721172717,
   'object': 'model',
   'owned_by': 'system'},
  {'id': 'gpt-4o-mini-audio-preview-2024-12-17',
   'created': 1734115920,
   'object': 'model',
   'owned_by': 'system'},
  {'id': 'dall-e-3',
   'created': 1698785189,
   'object': 'model',
   'owned_by': 'system'},
  {'id': 'dall-e-2',
   'created': 1698798177,
   'object': 'model',
   'owned_by': 'system'},
  {'id': 'gpt-4o-audio-preview-2024-10-01',
   'created': 1727389042,
   'object': 'model',
   'owned_by': 'system'},
  {'id': 'gpt-4o-audio-preview',
   'created': 1727460443,
   'object': 'model',
   'owned_by': 'system'},
  {'id': 'gpt-4o-mini-realtime-preview-2024-12-17',
   'created': 1734112601,
   'object': 'model',
   'owned_by': 'system'},
  {'

In [5]:
# %pip install -q -e ../../..

In [6]:
import logging, sys, os
from typing import Dict, Any, List

In [None]:
os.environ["OPENAI_API_KEY"] = ''

In [16]:
openai_key = os.getenv("OPENAI_API_KEY")

In [10]:
# fixing dependencies if current path is <root>/src/ember/examples/diversity_testbench.ipynb
project_root = os.path.abspath(os.path.join(os.getcwd(), "../"))
if project_root not in sys.path:
    sys.path.insert(0, project_root)

### Ember Repo Loads (WIP)

In [11]:
# from ember.core.registry.model.model_module.lm import LMModule, LMModuleConfig
from ember.core.registry.model.config.settings import initialize_ember
from ember.core.registry.model.base.services.model_service import ModelService
from ember.core.registry.model.base.schemas.model_info import ModelInfo
from ember.core.registry.model.base.schemas.cost import ModelCost, RateLimit
from ember.core.registry.model.base.schemas.provider_info import ProviderInfo


In [12]:
model_registry = initialize_ember()
print(model_registry.list_models())
llm = ModelService(registry=model_registry)

[]


In [13]:
# Register an OpenAI GPT-4o model
openai_info = ModelInfo(
    model_id="openai:gpt-4o",
    model_name="gpt-4o",
    cost=ModelCost(input_cost_per_thousand=0.03, output_cost_per_thousand=0.06),
    rate_limit=RateLimit(tokens_per_minute=80000, requests_per_minute=5000),
    provider=ProviderInfo(name="OpenAI", default_api_key=openai_key),
    api_key=openai_key,
)
model_registry.register_model(openai_info)

ValidationError: 2 validation errors for ModelInfo
id
  Field required [type=missing, input_value={'model_id': 'openai:gpt-...O3D93O0JF2l7JSHiKmgg4A'}, input_type=dict]
    For further information visit https://errors.pydantic.dev/2.10/v/missing
name
  Field required [type=missing, input_value={'model_id': 'openai:gpt-...O3D93O0JF2l7JSHiKmgg4A'}, input_type=dict]
    For further information visit https://errors.pydantic.dev/2.10/v/missing

In [15]:
response = llm(prompt="Hello!", model_id="openai:gpt-4o")
print(response.data)

ModelNotFoundError: Model 'openai:gpt-4o' not found. Available models:
- 

---
---

## Neural Similarity Scoring - Cosine Similarity

from `src/ember/core/utils/embedding_utils.py`

In [21]:
%pip install -q openai

Note: you may need to restart the kernel to use updated packages.


In [20]:
from __future__ import annotations

from abc import ABC, abstractmethod
from typing import List, Protocol
import math

import openai
import os


################################################################
# 1) Embedding Model Interfaces & Implementations
################################################################


class EmbeddingModel(Protocol):
    """Interface for embedding models.

    This protocol defines the minimal interface required to compute a text
    embedding. Implementations may use local models, external APIs, or custom
    neural networks.

    Methods:
        embed_text: Compute the embedding for a given text.
    """

    def embed_text(self, text: str) -> List[float]:
        """Computes the embedding vector for the provided text.

        Args:
            text (str): The text to be embedded.

        Returns:
            List[float]: A list of floats representing the embedding vector.
        """
        ...

class Text_Embedding_3_EmbeddingModel(Protocol):
    """Interface for embedding models.

    This protocol defines the minimal interface required to compute a text
    embedding. Implementations may use local models, external APIs, or custom
    neural networks.

    Methods:
        embed_text: Compute the embedding for a given text.
    """

    def __init__(self, api_key: str = None):
        """Initializes the embedding model with the OpenAI API key.

        Args:
            api_key (str): OpenAI API key for authentication.
        """
        self.api_key = api_key or os.environ.get("OPENAI_API_KEY")
        if not self.api_key:
            raise ValueError("OpenAI API key must be provided or set in the environment variable OPENAI_API_KEY.")
        openai.api_key = self.api_key

    def embed_text(self, text: str) -> List[float]:
        """Computes the embedding vector for the provided text.

        Args:
            text (str): The text to be embedded.

        Returns:
            List[float]: A list of floats representing the embedding vector.
        """
        response = openai.Embedding.create(
            model="text-embedding-3",
            input=text
        )
        return response["data"][0]["embedding"]


class MockEmbeddingModel:
    """Mock implementation of an embedding model using naive ASCII encoding.

    This simple model converts each character in the text to a normalized ASCII
    value. It is intended solely for demonstration and testing purposes.

    Methods:
        embed_text: Converts text to a sequence of normalized ASCII values.
    """

    def embed_text(self, text: str) -> List[float]:
        """Embeds text by converting each character to its normalized ASCII code.

        Args:
            text (str): The input text to be embedded.

        Returns:
            List[float]: A list of floats representing the embedding. Returns an
            empty list if the text is empty.
        """
        if not text:
            return []
        return [ord(ch) / 256.0 for ch in text]


################################################################
# 2) Similarity Metric Interface & Implementations
################################################################


class SimilarityMetric(ABC):
    """Abstract base class for computing similarity between embedding vectors.

    Subclasses must implement the similarity method to calculate a similarity
    score between two vectors.
    """

    @abstractmethod
    def similarity(self, vec_a: List[float], vec_b: List[float]) -> float:
        """Calculates the similarity between two embedding vectors.

        Args:
            vec_a (List[float]): The first embedding vector.
            vec_b (List[float]): The second embedding vector.

        Returns:
            float: The similarity score, typically in the range [0, 1] or [-1, 1].
        """
        ...


class CosineSimilarity(SimilarityMetric):
    """Implementation of cosine similarity for embedding vectors.

    The cosine similarity is defined as:
        similarity(a, b) = (a · b) / (||a|| * ||b||)

    Returns 0.0 if either vector is empty or if any vector's norm is zero.
    """

    def similarity(self, vec_a: List[float], vec_b: List[float]) -> float:
        """Computes cosine similarity between two embedding vectors.

        Args:
            vec_a (List[float]): The first embedding vector.
            vec_b (List[float]): The second embedding vector.

        Returns:
            float: The cosine similarity score.
        """
        if not vec_a or not vec_b:
            return 0.0

        dot_product: float = sum(a * b for a, b in zip(vec_a, vec_b))
        norm_a: float = math.sqrt(sum(a * a for a in vec_a))
        norm_b: float = math.sqrt(sum(b * b for b in vec_b))
        if norm_a == 0 or norm_b == 0:
            return 0.0

        return dot_product / (norm_a * norm_b)


################################################################
# 3) High-Level Utility Function
################################################################


def calculate_text_similarity(
    text1: str, text2: str, model: EmbeddingModel, metric: SimilarityMetric
) -> float:
    """Calculates text similarity using an embedding model and a similarity metric.

    This function generates embeddings for the provided texts and then computes a
    similarity score using the given similarity metric.

    Args:
        text1 (str): The first text string.
        text2 (str): The second text string.
        model (EmbeddingModel): An instance conforming to the embedding model interface.
        metric (SimilarityMetric): An instance implementing a similarity metric.

    Returns:
        float: The computed similarity score.
    """
    embedding1: List[float] = model.embed_text(text=text1)
    embedding2: List[float] = model.embed_text(text=text2)
    return metric.similarity(vec_a=embedding1, vec_b=embedding2)


################################################################
# 4) Example Usage (Executable as Script)
################################################################
if __name__ == "__main__":
    mock_model: MockEmbeddingModel = MockEmbeddingModel()
    cosine: CosineSimilarity = CosineSimilarity()

    text_a: str = "Hello world!"
    text_b: str = "Hello, world??"

    score: float = calculate_text_similarity(
        text1=text_a, text2=text_b, model=mock_model, metric=cosine
    )
    print(f"Similarity between '{text_a}' and '{text_b}': {score}")

Similarity between 'Hello world!' and 'Hello, world??': 0.9150491464734943


---
---

## Compression Ratio (WIP)

from `src/ember/core/utils/eval/evaluators.py`

In [27]:
%pip install -q diversity

Note: you may need to restart the kernel to use updated packages.


In [28]:
from __future__ import annotations

import re
import subprocess
from typing import Any, Dict, TypeVar, Optional, List, Generic, Callable, Union

from .base_evaluator import IEvaluator, EvaluationResult
from .extractors import RegexExtractor

from diversity import compression_ratio

T_out = TypeVar("T_out")
T_truth = TypeVar("T_truth")


class ComposedEvaluator(IEvaluator[T_out, T_truth], Generic[T_out, T_truth]):
    """Combines an output extractor with an evaluator for the extracted data.

    This evaluator first transforms the system output using the provided extractor,
    then evaluates the extracted value using the specified base evaluator.

    Args:
        extractor: An object with an `extract` method to process the system output.
        base_evaluator (IEvaluator): An evaluator that processes the extracted output.

    Returns:
        EvaluationResult: The result of the evaluation.
    """

    def __init__(
        self,
        extractor: Any,  # Expecting an extractor with an `extract` method.
        base_evaluator: IEvaluator[Any, Any],
    ) -> None:
        self.extractor = extractor
        self.base_evaluator = base_evaluator

    def evaluate(
        self, system_output: T_out, correct_answer: Any, **kwargs: Any
    ) -> EvaluationResult:
        """Evaluates the provided system output against the correct answer.

        Args:
            system_output (T_out): The raw output generated by the system.
            correct_answer (Any): The expected correct answer.
            **kwargs: Additional keyword arguments for extraction or evaluation.

        Returns:
            EvaluationResult: The result of evaluating the extracted value.
        """
        extracted_value = self.extractor.extract(system_output, **kwargs)
        return self.base_evaluator.evaluate(extracted_value, correct_answer, **kwargs)


# Basic Evaluators


class ExactMatchEvaluator(IEvaluator[str, str]):
    """Evaluator to check for an exact match between two strings,
    ignoring differences in whitespace and case.

    Example:
        evaluator = ExactMatchEvaluator()
        result = evaluator.evaluate("Hello World", "hello   world")

    Args:
        compare_fn (Optional[Callable[[str, str], bool]]): Optional custom comparison function.
            If not provided, strings are normalized (whitespace removed, lowercase) before comparison.

    Returns:
        EvaluationResult: The result containing a correctness flag and a score.
    """

    def __init__(self, compare_fn: Optional[Callable[[str, str], bool]] = None) -> None:
        self.compare_fn = compare_fn or self._default_compare

    def _default_compare(self, str1: str, str2: str) -> bool:
        """Default string comparison function that ignores case and whitespace.

        Args:
            str1 (str): First string to compare
            str2 (str): Second string to compare

        Returns:
            bool: True if strings match after normalization
        """
        return str1.strip().lower() == str2.strip().lower()

    def evaluate(
        self, system_output: str, correct_answer: str, **kwargs: Any
    ) -> EvaluationResult:
        """Evaluates whether a system output exactly matches the correct answer.

        Args:
            system_output (str): The system-generated string.
            correct_answer (str): The expected answer string.
            **kwargs: Additional keyword arguments (unused).

        Returns:
            EvaluationResult: An object with `is_correct` set to True if the normalized strings match,
                              along with a corresponding score.
        """
        is_correct = self.compare_fn(system_output, correct_answer)
        score = 1.0 if is_correct else 0.0
        return EvaluationResult(is_correct=is_correct, score=score)


class NumericToleranceEvaluator(IEvaluator[float, float]):
    """Evaluator to check if a numeric output is within a specified tolerance of the expected value.

    Example:
        evaluator = NumericToleranceEvaluator(tolerance=0.05)
        result = evaluator.evaluate(3.14159, 3.14)

    Args:
        tolerance (float): The maximum allowed difference between the output and the correct value.
    """

    def __init__(self, tolerance: float = 0.01) -> None:
        self.tolerance = tolerance

    def evaluate(
        self, system_output: float, correct_answer: float, **kwargs: Any
    ) -> EvaluationResult:
        """Evaluates the numeric system output against the correct value within a specified tolerance.

        Args:
            system_output (float): The numeric output from the system.
            correct_answer (float): The expected numeric answer.
            **kwargs: Additional keyword arguments (unused).

        Returns:
            EvaluationResult: The result including a correctness flag, score, and metadata about the difference.
        """
        difference = abs(system_output - correct_answer)
        # Round to handle floating point precision issues
        rounded_diff = round(difference, 8)
        is_correct = rounded_diff <= self.tolerance
        base = abs(correct_answer) if correct_answer != 0 else 1.0
        score = max(0.0, 1.0 - rounded_diff / base)
        return EvaluationResult(
            is_correct=is_correct, score=score, metadata={"diff": rounded_diff}
        )


class CodeExecutionEvaluator(IEvaluator[str, str]):
    """Evaluator that executes Python code and compares its standard output to an expected result.

    **WARNING**: Executing arbitrary code is dangerous.
    Only use this evaluator with fully trusted code strings.

    Args:
        timeout (float): Maximum duration (in seconds) to allow code execution.
    """

    def __init__(self, timeout: float = 5.0) -> None:
        self.timeout = timeout

    def evaluate(
        self, system_output: str, correct_answer: str, **kwargs: Any
    ) -> EvaluationResult:
        """Executes the provided Python code and compares its standard output to the expected result.

        Args:
            system_output (str): A Python code string to be executed.
            correct_answer (str): The expected output from the code execution.
            **kwargs: Additional keyword arguments (unused).

        Returns:
            EvaluationResult: The result of execution, including stdout, stderr, and exit code in metadata.
        """
        try:
            process_result: subprocess.CompletedProcess = subprocess.run(
                args=["python", "-c", system_output],
                capture_output=True,
                text=True,
                timeout=self.timeout,
            )
            stdout_str = process_result.stdout.strip()
            expected_str = correct_answer.strip()
            is_correct = stdout_str == expected_str
            return EvaluationResult(
                is_correct=is_correct,
                score=1.0 if is_correct else 0.0,
                metadata={
                    "stdout": process_result.stdout,
                    "stderr": process_result.stderr,
                    "exit_code": process_result.returncode,
                },
            )
        except subprocess.TimeoutExpired as timeout_error:
            return EvaluationResult(
                is_correct=False,
                score=0.0,
                metadata={"error": f"TimeoutExpired: {str(timeout_error)}"},
            )
        except Exception as error:
            return EvaluationResult(
                is_correct=False,
                score=0.0,
                metadata={"error": f"{type(error).__name__}: {str(error)}"},
            )

class DiversityScoringEvaluator(IEvaluator[List[str], None]):
    """
    Evaluator to test ensemble outputs -> score them (float)
    """
    def evaluate(
            self, 
            system_output: List[str], 
            **kwargs) -> EvaluationResult:
        if system_output is None or len(system_output) == 0:
            return EvaluationResult(is_correct=False, score=-1)

        # current compression ratio formula
        # TODO: update scoring function to make it better
        # -> like use token count

        # example I was thinking about:
        # letter_sum = sum(len(response) for response in system_output)
        # ratio = compression_ratio(system_output) * min(1, len(system_output)/5) * min(1, letter_sum/100)
        ratio = compression_ratio(system_output, algorithm='gzip',verbose=True)
        return EvaluationResult(is_correct=True,score=ratio,metadata = {'responses': system_output})

# Composite Evaluator Example


class PartialRegexEvaluator(ComposedEvaluator[str, str]):
    """Evaluator that uses a regex extractor followed by an exact match evaluation.

    First, it extracts a substring using a regular expression, then checks if the extracted
    value matches the expected answer exactly.

    Args:
        pattern (str): The regular expression pattern used for extraction.
    """

    def __init__(self, pattern: str) -> None:
        extractor = RegexExtractor(pattern)
        evaluator = ExactMatchEvaluator()
        super().__init__(extractor=extractor, base_evaluator=evaluator)


if __name__ == "__main__":
    # Example 1: Direct final-output comparison (exact match)
    exact_evaluator = ExactMatchEvaluator()
    result_exact = exact_evaluator.evaluate("Hello World", "hello  world")
    print("ExactMatch result:", result_exact)

    # Example 2: Numeric tolerance evaluation
    numeric_evaluator = NumericToleranceEvaluator(tolerance=0.05)
    result_numeric = numeric_evaluator.evaluate(3.14159, 3.14)
    print("NumericTolerance result:", result_numeric)

    # Example 3: Composite evaluator with regex extraction and exact matching.
    regex_pattern = r"answer\s+is\s+(\w+)"
    partial_regex_evaluator = PartialRegexEvaluator(pattern=regex_pattern)
    result_regex = partial_regex_evaluator.evaluate("The answer is PARIS", "PARIS")
    print("PartialRegexEvaluator result:", result_regex)

    # Example 4: Code execution evaluator.
    code_evaluator = CodeExecutionEvaluator()
    code_string = "print('Hello')"
    result_code = code_evaluator.evaluate(code_string, "Hello")
    print("CodeExecutionEvaluator result:", result_code)

    #TODO Example 5: Diversity Scoring evaluator.
    diversity_evaluator = DiversityScoringEvaluator()
    # input_strs = ["hi there", "hi", "hello", "yo whatup"]
    input_strs = ["This is a sample text with lots of repetition.", 
                  "This is a sample text with lots of repetition.",
                  "This is a sample text with lots of repetition."]
    result_diversity = diversity_evaluator.evaluate(input_strs)
    print("DiversityScoringEvaluator result:", result_diversity)

ImportError: attempted relative import with no known parent package

---
---
## Potential other cases to explore
-  Edit distance
- ensembling all "diversity" related metrics
- combination of validation/hallucination metric + ensembled diversity metric -> score