In [2]:
%pip install --upgrade pip
%pip install --disable-pip-version-check torch
%pip install transformers sentence-transformers

%pip install -q -U bitsandbytes==0.45.3
%pip install pandas



In [3]:
from transformers import AutoTokenizer, AutoModelForCausalLM
from sentence_transformers import SentenceTransformer, util
import torch

import re
import itertools
import random
import pandas as pd
from tqdm import tqdm
import math

In [None]:
dataset = [
    {
        "question": "What is a variable in programming?",
        "answer": "A variable is a named memory location used to store data that can be read or modified during program execution.",
    },
    {
        "question": "What is the difference between a list and an array?",
        "answer": "A list can grow or shrink dynamically, while an array usually has a fixed size and often stores elements of the same data type.",
    },
    {
        "question": "What is a for loop used for?",
        "answer": "A for loop is used to repeat a block of code a specific number of times or to iterate over a collection of elements.",
    },
    {
        "question": "What is a function?",
        "answer": "A function is a reusable block of code designed to perform a specific task, which can accept parameters and optionally return a result.",
    },
    {
        "question": "What is an API?",
        "answer": "An API (Application Programming Interface) defines rules and methods that allow different software applications to communicate with each other.",
    },
    {
        "question": "What does it mean to debug a program?",
        "answer": "Debugging is the process of identifying, analyzing, and fixing errors or unexpected behavior in a program.",
    },
    {
        "question": "What is an exception?",
        "answer": "An exception is an error or unexpected event that occurs during program execution and can disrupt the normal flow of the program.",
    },
    {
        "question": "Difference between == and === in JavaScript?",
        "answer": "== compares values after performing type conversion, while === compares both value and type without any conversion.",
    },
    {
        "question": "What is an algorithm?",
        "answer": "An algorithm is a well-defined, step-by-step procedure used to solve a problem or perform a computation.",
    },
    {
        "question": "What does open source mean?",
        "answer": "Open source software has source code that is publicly available and can be studied, modified, and distributed by anyone.",
    },
    {
        "question": "What is an object in OOP?",
        "answer": "An object is an instance of a class that contains data (attributes) and behavior (methods) related to that data.",
    },
    {
        "question": "What is Git used for?",
        "answer": "Git is a version control system used to track changes in source code and collaborate efficiently with other developers.",
    },
    {
        "question": "What is an infinite loop?",
        "answer": "An infinite loop is a loop that runs indefinitely because its stopping condition is never met or never changes.",
    },
    {
        "question": "What is an SQL SELECT query?",
        "answer": "A SELECT query is used to retrieve specific data from one or more tables in a database.",
    },
    {
        "question": "What do frontend and backend mean?",
        "answer": "Frontend refers to the user-facing part of an application, while backend handles server-side logic, databases, and application processing.",
    },
]

In [11]:
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using device: {DEVICE}")


EMBEDDING_MODEL_NAME = "sentence-transformers/all-MiniLM-L6-v2"
# MODEL_NAME = "unsloth/Phi-3-mini-4k-instruct-bnb-4bit"
MODEL_NAME = "Gensyn/Qwen2.5-1.5B-Instruct"
EMBEDDING_MODEL = SentenceTransformer(EMBEDDING_MODEL_NAME)
TOKENIZER = AutoTokenizer.from_pretrained(MODEL_NAME, use_fast=True)
LLM_MODEL = AutoModelForCausalLM.from_pretrained(MODEL_NAME)
LLM_MODEL.to(DEVICE)
_ = LLM_MODEL.eval()

# print LLM_MODEL device
print(f"LLM_MODEL device: {next(LLM_MODEL.parameters()).device}")

Using device: cuda


tokenizer_config.json: 0.00B [00:00, ?B/s]

vocab.json: 0.00B [00:00, ?B/s]

merges.txt: 0.00B [00:00, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

config.json:   0%|          | 0.00/660 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/3.09G [00:00<?, ?B/s]

generation_config.json:   0%|          | 0.00/242 [00:00<?, ?B/s]

LLM_MODEL device: cuda:0


In [16]:
class Data:
    """
    Simple container for a question/answer pair.

    Attributes
    ----------
    question : str
        The question text.
    answer : str
        The answer text.

    Parameters
    ----------
    data : dict
        A mapping with keys "question" and "answer".
    """

    def __init__(self, data: dict):
        self.question = data["question"]
        self.answer = data["answer"]

    def __str__(self):
        """Return a string representation of the Data object."""
        return f"Data(question={self.question}, answer={self.answer})"

    def __eq__(self, other):
        """
        Check equality between two Data objects.

        Parameters
        ----------
        other : Data
            Another Data object to compare with.

        Returns
        -------
        bool
            True if both question and answer match, NotImplemented if other is not a Data.
        """
        if not isinstance(other, Data):
            return NotImplemented
        return self.question == other.question and self.answer == other.answer


class Dataset:
    """
    Collection of Data objects with convenient accessors.

    Parameters
    ----------
    data : list[dict]
        Iterable of dictionaries, each containing "question" and "answer".

    Attributes
    ----------
    data : list[Data]
        List of Data objects created from input dictionaries.
    """

    def __init__(self, data: list):
        self.data = [Data(entry) for entry in data]

    def get_random_entry(self):
        """Return a single random Data object from the dataset."""
        return random.choice(self.data)

    def get_all_entries(self):
        """Return the list of all Data objects."""
        return self.data

    def generate_random_entries(
        self, count: int = 1, forbiden_entry: list[Data] = []
    ) -> list[Data]:
        """
        Generate a list of unique random Data entries, excluding forbidden ones.

        Parameters
        ----------
        count : int, optional (default=1)
            Number of random entries to generate.
        forbiden_entry : list[Data], optional (default=[])
            List of Data entries to exclude from selection.

        Returns
        -------
        list[Data]
            A list of randomly selected Data objects.

        Raises
        ------
        AssertionError
            If count is not between 0 and dataset size minus one.
        """
        assert (
            0 <= count <= len(self.data) - 1
        ), "n must be between 0 and the size of the dataset minus one."
        other_entries = random.sample(
            list(itertools.filterfalse(lambda e: e in forbiden_entry, self.data)),
            count,
        )
        return other_entries

    def add_other_entries(
        self,
        forbiden_entry: list[Data],
        other_entries: list[Data],
        number_of_shots: int,
    ) -> list[Data]:
        """
        Add additional random entries to an existing list, avoiding duplicates.

        Parameters
        ----------
        forbiden_entry : list[Data]
            List of Data entries to exclude from selection.
        other_entries : list[Data]
            Existing list of entries to extend.
        number_of_shots : int
            Number of additional entries to add.

        Returns
        -------
        list[Data]
            Combined list of other_entries plus the additional random entries.

        Raises
        ------
        AssertionError
            If number_of_shots is not between 0 and dataset size minus one.
        """
        assert (
            0 <= number_of_shots <= len(self.data) - 1
        ), "n must be between 0 and the size of the dataset minus one."
        additional_entries = random.sample(
            list(
                itertools.filterfalse(
                    lambda e: e in forbiden_entry or e in other_entries, self.data
                )
            ),
            number_of_shots,
        )
        return other_entries + additional_entries

    def build_prompt(self, main_entry: Data, other_entries: list[Data]) -> str:
        """
        Build a formatted prompt string from example entries and a main question.

        Parameters
        ----------
        main_entry : Data
            The main question/answer pair whose question will be asked.
        other_entries : list[Data]
            List of example question/answer pairs to include as few-shot examples.

        Returns
        -------
        str
            Formatted prompt with examples followed by the main question.

        Raises
        ------
        AssertionError
            If main_entry is not part of the dataset.
        """
        assert main_entry in self.data, "main_entry must be part of the dataset"

        return (
            "".join(
                f"QUESTION:\n{e.question}\nANSWER:\n{e.answer}\n\n"
                for e in other_entries
            )
            + f"QUESTION:\n{main_entry.question}\nANSWER:\n"
        )

    def generate_prompt(self, number_of_shots: int = 0) -> tuple[str, Data]:
        """
        Generate a few-shot prompt for the language model.

        Parameters
        ----------
        number_of_shots : int, optional (default=0)
            Number of example question/answer pairs to include before the main question.

        Returns
        -------
        tuple[str, Data]
            A tuple containing:
            - The formatted prompt string with examples and the main question.
            - The main Data entry whose answer should be generated.
        """
        main_entry = self.get_random_entry()
        other_entries = self.generate_random_entries(
            count=number_of_shots, forbiden_entry=[main_entry]
        )
        prompt = self.build_prompt(main_entry, other_entries)
        return prompt, main_entry

    def generate_multiple_prompts(
        self, count=1, number_of_shots=[0]
    ) -> dict[int, list[tuple[str, Data]]]:
        """
        Generate multiple few-shot prompts for different shot counts.

        Parameters
        ----------
        count : int, optional (default=1)
            Number of main entries to generate prompts for.
        number_of_shots : list[int], optional (default=[0])
            List of shot counts to generate prompts for each main entry.

        Returns
        -------
        dict[int, list[tuple[str, Data]]]
            A dictionary mapping each shot count to a list of tuples, where each tuple contains:
            - The formatted prompt string with examples and the main question.
            - The main Data entry whose answer should be generated.
        """
        number_of_shots = sorted(number_of_shots)
        prompts = {}
        for shots in number_of_shots:
            prompts[shots] = []
        main_entries = self.generate_random_entries(count=count)
        for main_entry in main_entries:
            current_other_entries = []
            for shots in number_of_shots:
                if shots > 0:
                    current_other_entries = self.add_other_entries(
                        forbiden_entry=[main_entry],
                        other_entries=current_other_entries,
                        number_of_shots=shots - len(current_other_entries),
                    )
                prompt = self.build_prompt(main_entry, current_other_entries)
                prompts[shots].append((prompt, main_entry))
        return prompts


class Evaluator:
    """
    Hybrid evaluator for assessing model-generated answers.

    The final score combines:
    - embedding cosine similarity (semantic correctness)
    - keyword recall (content coverage)
    - length penalty (conciseness)

    The score is normalized between 0.0 and 1.0.

    Parameters
    ----------
    embedding_model : SentenceTransformer
        The embedding model used for computing semantic similarity.

    Attributes
    ----------
    model : SentenceTransformer
        The embedding model instance.
    """

    def __init__(self, embedding_model):
        self.model = embedding_model

    def _tokenize(self, text: str) -> set:
        """
        Simple word tokenizer that lowercases and removes punctuation.

        Parameters
        ----------
        text : str
            The text to tokenize.

        Returns
        -------
        set
            A set of lowercase word tokens.
        """
        text = text.lower()
        text = re.sub(r"[^\w\s]", "", text)
        return set(text.split())

    def _keyword_score(self, generated: str, reference: str) -> float:
        """
        Compute keyword recall: proportion of reference words present in generated answer.

        Parameters
        ----------
        generated : str
            The generated answer text.
        reference : str
            The reference answer text.

        Returns
        -------
        float
            Recall score between 0.0 and 1.0.
        """
        ref_tokens = self._tokenize(reference)
        gen_tokens = self._tokenize(generated)

        if not ref_tokens:
            return 0.0

        return len(ref_tokens & gen_tokens) / len(ref_tokens)

    def _length_score(self, generated: str, reference: str) -> float:
        """
        Penalize excessive verbosity using a smooth ratio-based score.

        Score is 1.0 when lengths match, decreases otherwise.

        Parameters
        ----------
        generated : str
            The generated answer text.
        reference : str
            The reference answer text.

        Returns
        -------
        float
            Length penalty score between 0.0 and 1.0.
        """
        gen_len = max(len(generated.split()), 1)
        ref_len = max(len(reference.split()), 1)

        ratio = gen_len / ref_len

        # Symmetric penalty: perfect at ratio=1
        return math.exp(-abs(math.log(ratio)))

    def evaluate(self, generated_answer: str, reference_answer: str) -> float:
        """
        Compute the hybrid evaluation score.

        Parameters
        ----------
        generated_answer : str
            The model-generated answer.
        reference_answer : str
            The reference/expected answer.

        Returns
        -------
        float
            Final weighted score combining embedding similarity (50%),
            keyword recall (30%), and length penalty (20%).
        """
        # Embedding similarity
        emb_gen = self.model.encode(generated_answer, convert_to_tensor=True)
        emb_ref = self.model.encode(reference_answer, convert_to_tensor=True)
        embedding_score = util.cos_sim(emb_gen, emb_ref).item()

        # Keyword recall
        # keyword_score = self._keyword_score(generated_answer, reference_answer)

        # Length penalty
        # length_score = self._length_score(generated_answer, reference_answer)

        # Weighted final score
        # final_score = 0.5 * embedding_score + 0.3 * keyword_score + 0.2 * length_score
        final_score = embedding_score
        return float(final_score)


evaluator = Evaluator(EMBEDDING_MODEL)
ds = Dataset(data=dataset)
# print(
#     evaluator.evaluate(
#         "J'ai un chats dans ma maison.",
#         "Je possède un chat à la maison.",
#     )
# )
res = ds.generate_multiple_prompts(count=2, number_of_shots=[0, 2, 4])
for k, v in res.items():
    print(f"Number of shots: {k}")
    for prompt, main_entry in v:
        print("Prompt:")
        print(prompt)

Number of shots: 0
Prompt:
QUESTION:
What is a for loop used for?
ANSWER:

Prompt:
QUESTION:
What is a function?
ANSWER:

Number of shots: 2
Prompt:
QUESTION:
What is an SQL SELECT query?
ANSWER:
A SELECT query is used to retrieve specific data from one or more tables in a database.

QUESTION:
What is an exception?
ANSWER:
An exception is an error or unexpected event that occurs during program execution and can disrupt the normal flow of the program.

QUESTION:
What is a for loop used for?
ANSWER:

Prompt:
QUESTION:
Difference between == and === in JavaScript?
ANSWER:
== compares values after performing type conversion, while === compares both value and type without any conversion.

QUESTION:
What is the difference between a list and an array?
ANSWER:
A list can grow or shrink dynamically, while an array usually has a fixed size and often stores elements of the same data type.

QUESTION:
What is a function?
ANSWER:

Number of shots: 4
Prompt:
QUESTION:
What is an SQL SELECT query?
ANSW

In [17]:
class LLM:
    """
    Wrapper for tokenizer and model to generate chat responses.

    Parameters
    ----------
    tokenizer : AutoTokenizer
        The tokenizer to use for encoding/decoding.
    model : AutoModelForCausalLM
        The language model for generation.

    Attributes
    ----------
    tokenizer : AutoTokenizer
        The tokenizer instance.
    model : AutoModelForCausalLM
        The language model instance.
    """

    def __init__(self, tokenizer, model):
        self.tokenizer = tokenizer
        self.model = model

    def invoke(
        self,
        messages,
        temperature=0.0,
        top_k=50,
        top_p=0.9,
        max_new_tokens=128,
    ):
        """
        Generate a response from the model.

        Parameters
        ----------
        messages : str or list
            The user message/prompt to send to the model. Can be a string
            (which will be wrapped as a user message) or a list of message dicts.
        temperature : float, optional (default=0.0)
            Controls randomness in generation. Higher values (e.g., 1.5) make
            output more random, lower values (e.g., 0.2) make it more deterministic.
            When set to 0.0, greedy decoding is used (do_sample=False).
        top_k : int, optional (default=50)
            Number of highest probability tokens to keep for top-k filtering.
        top_p : float, optional (default=0.9)
            Nucleus sampling: keeps the smallest set of tokens whose cumulative
            probability exceeds top_p.

        Returns
        -------
        str
            The generated response with thinking tags removed.
        """
        # Convert string to chat format if needed
        if isinstance(messages, str):
            messages = [{"role": "user", "content": messages}]

        # Apply chat template and tokenize
        inputs = self.tokenizer.apply_chat_template(
            messages,
            return_tensors="pt",
            add_generation_prompt=True,
        ).to(self.model.device)

        do_sample = True
        if temperature == 0.0:
            do_sample = False

        # Build generation kwargs
        gen_kwargs = {
            "do_sample": do_sample,
            "pad_token_id": self.tokenizer.eos_token_id,
            "max_new_tokens": max_new_tokens,
        }
        if do_sample:
            gen_kwargs["temperature"] = temperature
            gen_kwargs["top_k"] = top_k
            gen_kwargs["top_p"] = top_p
        with torch.no_grad():
            outputs = self.model.generate(inputs, **gen_kwargs)
        decoded = self.tokenizer.decode(
            outputs[0][inputs.shape[-1] :],
            skip_special_tokens=True,
        )
        # remove <think>...</think> tags
        cleaned = re.sub(r"<think>.*?</think>", "", decoded, flags=re.DOTALL).strip()
        return cleaned


llm = LLM(TOKENIZER, LLM_MODEL)
prompt, main_entry = ds.generate_prompt(number_of_shots=3)
print(f"Prompt:\n{prompt}")
response = llm.invoke(prompt)
print("Generated:", response)
print("Expected:", main_entry.answer)
evaluator.evaluate(response, main_entry.answer)

Prompt:
QUESTION:
What is an API?
ANSWER:
An API (Application Programming Interface) defines rules and methods that allow different software applications to communicate with each other.

QUESTION:
What is an SQL SELECT query?
ANSWER:
A SELECT query is used to retrieve specific data from one or more tables in a database.

QUESTION:
What is a variable in programming?
ANSWER:
A variable is a named memory location used to store data that can be read or modified during program execution.

QUESTION:
What does open source mean?
ANSWER:

Generated: Open source means that the source code of a software application is freely available for anyone to use, modify, and distribute. This allows users to inspect how the software works, improve it, and share their improvements with others. Open-source projects often have active communities where developers collaborate on enhancements and bug fixes.
Expected: Open source software has source code that is publicly available and can be studied, modified, and

0.7695186138153076

In [18]:
# Pipeline to test different invoke parameters
class ParameterPipeline:
    """
    Pipeline to evaluate LLM performance across different generation parameters.

    Parameters
    ----------
    llm : LLM
        The language model wrapper.
    evaluator : Evaluator
        The evaluator for scoring answers.
    dataset : Dataset
        The dataset containing question/answer pairs.

    Attributes
    ----------
    llm : LLM
        The language model wrapper instance.
    evaluator : Evaluator
        The evaluator instance.
    dataset : Dataset
        The dataset instance.
    """

    def __init__(self, llm: LLM, evaluator: Evaluator, dataset: Dataset):
        self.llm = llm
        self.evaluator = evaluator
        self.dataset = dataset

    def run_experiment(
        self,
        param_grid: dict,
        num_samples: int = 5,
        seed: int = 42,
    ) -> pd.DataFrame:
        """
        Run experiments with different parameter combinations.

        Parameters
        ----------
        param_grid : dict
            Dictionary where keys are parameter names and values are lists of values to test.
            Can include "number_of_shots" to vary prompt examples.
            Example: {"temperature": [0.5, 1.0], "top_k": [10, 50], "number_of_shots": [0, 2]}
        num_samples : int, optional (default=5)
            Number of question/answer pairs to use for averaging.
        seed : int, optional (default=42)
            Random seed for reproducibility.

        Returns
        -------
        pd.DataFrame
            Results table with columns: number_of_shots, parameter values,
            avg_score, min_score, max_score.

        Notes
        -----
        For temperature=0.0, duplicate combinations with same number_of_shots are skipped
        since greedy decoding produces deterministic outputs.
        """
        random.seed(seed)

        # Extract number_of_shots from param_grid if present
        number_of_shots_values = param_grid.pop("number_of_shots", [0])

        # Pre-generate prompts and entries for all number_of_shots values at once
        questions = self.dataset.generate_multiple_prompts(
            count=num_samples, number_of_shots=number_of_shots_values
        )

        # Generate all parameter combinations (excluding number_of_shots)
        param_names = list(param_grid.keys())
        param_values = list(param_grid.values())
        invoke_combinations = list(itertools.product(*param_values))

        # Create full combinations including number_of_shots
        all_combinations = list(
            itertools.product(number_of_shots_values, invoke_combinations)
        )

        results = []
        # Track (number_of_shots, top_k) pairs already run for temperature 0
        seen_temp0_combinations = set()

        for number_of_shots, invoke_combo in tqdm(
            all_combinations, desc="Testing parameter combinations"
        ):
            invoke_params = dict(zip(param_names, invoke_combo))

            # For temperature 0, skip top_k and top_p variation for same number_of_shots
            if invoke_params.get("temperature", 1.0) == 0.0:
                key = number_of_shots
                if key in seen_temp0_combinations:
                    continue
                seen_temp0_combinations.add(key)

            scores = []

            # Use pre-built prompts and entries
            for prompt, main_entry in questions[number_of_shots]:
                # Generate response with current invoke parameters
                response = self.llm.invoke(prompt, **invoke_params)

                # Evaluate response
                score = self.evaluator.evaluate(response, main_entry.answer)
                scores.append(score)

            avg_score = sum(scores) / len(scores)

            result = {
                "number_of_shots": number_of_shots,
                **invoke_params,
                "avg_score": avg_score,
                "min_score": min(scores),
                "max_score": max(scores),
            }
            results.append(result)

        return pd.DataFrame(results)

    def display_results(self, df: pd.DataFrame):
        """
        Display results sorted by average score with color gradient.

        Parameters
        ----------
        df : pd.DataFrame
            Results DataFrame from run_experiment.

        Returns
        -------
        pandas.io.formats.style.Styler
            Styled DataFrame with rows sorted by avg_score (descending)
            and a red-yellow-green background gradient on the avg_score column.
        """
        df_sorted = df.sort_values("avg_score", ascending=False)
        return df_sorted.style.background_gradient(subset=["avg_score"], cmap="RdYlGn")


# Define parameter grid to test (including number_of_shots)
param_grid = {
    "number_of_shots": [0, 2, 4],
    "temperature": [0.0, 0.5, 1.0],
    "top_k": [10, 50, 100],
    "top_p": [0.7, 0.95],
}

# Create pipeline and run experiments
pipeline = ParameterPipeline(llm, evaluator, ds)

results_df = pipeline.run_experiment(
    param_grid=param_grid,
    num_samples=5,  # Use 5 question/answer pairs
    seed=42,
)

# Display results table
pipeline.display_results(results_df)

Testing parameter combinations: 100%|██████████| 54/54 [12:09<00:00, 13.51s/it]


Unnamed: 0,number_of_shots,temperature,top_k,top_p,avg_score,min_score,max_score
34,4,1.0,10,0.95,0.876855,0.802863,0.952257
15,2,0.5,10,0.95,0.867095,0.810071,0.944486
35,4,1.0,50,0.7,0.865729,0.803265,0.927156
20,2,1.0,10,0.7,0.86492,0.793881,0.933544
23,2,1.0,50,0.95,0.863209,0.797097,0.919931
28,4,0.5,10,0.95,0.862399,0.779659,0.946969
33,4,1.0,10,0.7,0.858069,0.739278,0.938561
37,4,1.0,100,0.7,0.856847,0.717493,0.938808
26,4,0.0,10,0.7,0.853578,0.733064,0.955262
31,4,0.5,100,0.7,0.850464,0.733231,0.955197
