In [None]:
%load_ext autoreload
%autoreload 2

# Specify HyperParameters

In [None]:
model_name_or_path = "meta-llama/Llama-3.1-8B-Instruct"
device = "cuda:0"
dataset_name = "../workdir/data/triviaqa.csv"
batch_size = 2

# Initialize Model

In [None]:
import os
os.environ["CUDA_VISIBLE_DEVICES"] = "2"

from vllm import LLM, SamplingParams

llm = LLM(model=model_name_or_path, gpu_memory_utilization=0.5)
sampling_params = SamplingParams(max_tokens=30, logprobs=20)

In [None]:
messages = [
    [
        {
            "role": "user", 
            "content": "How many fingers on a coala's foot?"
        }
    ],
    [
        {
            "role": "user",
            "content": "Who sang a song Yesterday?"
        }
    ],
    [
        {
            "role": "user",
            "content": "Кто спел песню Кукла Колдуна?"
        }
    ],
    [
        {
            "role": "user",
            "content": "Translate into French: 'I want a small cup of coffee'"
        }
    ]
]

tokenizer = llm.get_tokenizer()
chat_messages = [tokenizer.apply_chat_template(m, tokenize=False) for m in messages]

# Infer LLM and get uncertainty scores

In [None]:
from typing import List
from lm_polygraph.utils.model import Model
from vllm import LLM

class WhiteboxModelvLLM(Model):
    """Basic whitebox model adapter for using vLLM in stat calculators and uncertainty estimators."""

    def __init__(self, model: LLM):
        self.model = model
        self.tokenizer = self.model.get_tokenizer()

    def generate(self, *args, **kwargs):
        return self.model.generate(*args, **kwargs)

    def tokenize(self, *args, **kwargs):
        return self.tokenizer(*args, **kwargs)

    def __call__(self, *args, **kwargs):
        return self.generate(*args, **kwargs)

    def generate_texts(self, input_texts: List[str], **args):
        outputs = self.generate(input_texts, args.pop("sampling_params"))
        texts = [output.outputs[0].text for output in outputs]
        return texts

In [None]:
from lm_polygraph.stat_calculators import StatCalculator
from lm_polygraph.utils.model import Model

from typing import Dict, List, Tuple

import numpy as np

import torch


class InfervLLMCalculator(StatCalculator):
    """
    Performs inference of the model and ensures that output contains
    1. logprobas
    2. tokens
    3. embeddings

    For Whitebox model (lm_polygraph.WhiteboxModel), at input texts batch calculates:
    * generation texts
    * tokens of the generation texts
    * probabilities distribution of the generated tokens
    """

    def __init__(
        self,
        n_alternatives: int = 10,
        return_embeddings: bool = False,
    ):
        super().__init__()

        self.n_alternatives = n_alternatives
        self._return_embeddings = return_embeddings # not supported by vLLM

    @staticmethod
    def meta_info() -> Tuple[List[str], List[str]]:
        """
        Returns the statistics and dependencies for the calculator.
        """

        return [
            "greedy_log_probs",
            "greedy_logits",
            "greedy_tokens",
            "greedy_log_likelihoods",
            "greedy_tokens_alternatives",
        ], []

    def _post_process_logits(self, out, model_inputs, vocab_size, eos_token_id):
        cut_logits = []
        cut_sequences = []
        cut_log_probs = []
        cut_alternatives = []
        lls = []
        
        for i in range(len(model_inputs)):
        
            seq = np.array(out[i].outputs[0].token_ids)
            length = len(seq)
            for j in range(len(seq)):
                if seq[j] == eos_token_id:
                    length = j + 1
                    break
                    
            tokens = seq[:length].tolist()
            cut_sequences.append(tokens)
        
            top_log_probs = out[i].outputs[0].logprobs
            log_probs = np.zeros((len(top_log_probs), vocab_size))
            for i, probs in enumerate(top_log_probs):
                top_tokens = np.array(list(probs.keys()))
                top_values = np.array([lp.logprob for lp in probs.values()])
                log_probs[i, :] = -np.inf
                log_probs[i, top_tokens] = top_values
        
            log_probs = log_probs[:length, :]
            logits = np.exp(log_probs)
            
            cut_logits.append(log_probs)
            cut_log_probs.append(log_probs)
            lls.append([log_probs[j, tokens[j]] for j in range(len(log_probs))])
        
            cut_alternatives.append([[] for _ in range(length)])
            for j in range(length):
                lt = logits[j, :]
                best_tokens = np.argpartition(lt, -self.n_alternatives)
                ln = len(best_tokens)
                best_tokens = best_tokens[ln - self.n_alternatives : ln]
                for t in best_tokens:
                    cut_alternatives[-1][j].append((t, lt[t]))
        
                cut_alternatives[-1][j].sort(
                    key=lambda x: x[0] == cut_sequences[-1][j],
                    reverse=True,
                )
        
        result_dict = {
            "greedy_log_probs": cut_log_probs,
            "greedy_logits": cut_logits,
            "greedy_tokens": cut_sequences,
            "greedy_log_likelihoods": lls,
            "greedy_tokens_alternatives": cut_alternatives,
        }
        
        return result_dict

    def __call__(
        self,
        dependencies: Dict[str, np.array],
        texts: List[str],
        model: Model,
        max_new_tokens: int = 100,
        **kwargs,
    ) -> Dict[str, np.ndarray]:
        """
        Calculates the statistics of probabilities at each token position in the generation.

        Parameters:
            dependencies (Dict[str, np.ndarray]): input statistics, can be empty (not used).
            texts (List[str]): Input texts batch used for model generation.
            model (Model): Model used for generation.
            max_new_tokens (int): Maximum number of new tokens at model generation. Default: 100.
        Returns:
            Dict[str, np.ndarray]: dictionary with the following items:
                - 'greedy_log_probs' (List[List[np.array]]): logarithms of autoregressive
                        probability distributions at each token,
                - 'greedy_texts' (List[str]): model generations corresponding to the inputs,
                - 'greedy_tokens' (List[List[int]]): tokenized model generations,
                - 'greedy_log_likelihoods' (List[List[float]]): log-probabilities of the generated tokens.
        """
        out = model.generate(texts, **kwargs)
        vocab_size = max(model.tokenizer.vocab_size, max(model.tokenizer.added_tokens_decoder.keys()))
        result_dict = self._post_process_logits(
            out, texts, vocab_size, model.tokenizer.eos_token_id
        )
        if self._return_embeddings:
            result_dict.update(
                {"embeddings_decoder": self._get_embeddings_from_output(out)}
            )

        return result_dict

In [None]:
from lm_polygraph.stat_calculators.greedy_alternatives_nli import GreedyAlternativesNLICalculator
from lm_polygraph.estimators.claim_conditioned_probability import ClaimConditionedProbability
from lm_polygraph.utils.deberta import Deberta

from torch.utils.data import DataLoader

model_adapter = WhiteboxModelvLLM(llm)

calc_infer_llm = InfervLLMCalculator()
nli_model = Deberta(device=device)
nli_model.setup()
calc_nli = GreedyAlternativesNLICalculator(nli_model=nli_model)

estimator = ClaimConditionedProbability()

data_loader = DataLoader(chat_messages, batch_size=batch_size, shuffle=False, collate_fn=lambda x: x)
for batch in data_loader:
    deps = {}
    deps.update(calc_infer_llm(
        deps, texts=batch, model=model_adapter, sampling_params=sampling_params))
    deps.update(calc_nli(deps, texts=None, model=model_adapter))

    uncertainty_scores = estimator(deps)
    generated_texts = tokenizer.batch_decode(deps['greedy_tokens'])
    
    for text, ue_score in zip(generated_texts, uncertainty_scores):
        print("Output:", text)
        print("Uncertainty score:", ue_score)
        print()