In [2]:
import torch
import transformers
from transformers import AutoModelForCausalLM, AutoTokenizer
# from utils.generation import is_chat
import time
import pdb

def is_chat(obj):
    """
    Determines if the given object is a properly formatted chat.

    Args:
    messages (list): A list of dictionaries representing chat messages.

    Returns:
    bool: True if the object is a chat, False otherwise.
    """
    # Check if messages is a list
    if not isinstance(obj, list):
        return False
    
    # Check if each item in the list is a dictionary with 'role' and 'content' keys
    for message in obj:
        if not isinstance(message, dict):
            return False
        if 'role' not in message or 'content' not in message:
            return False
        if not isinstance(message['role'], str) or not isinstance(message['content'], str):
            return False
        if message['role'] not in ['system', 'user', 'assistant']:
            return False

    return True

def set_device(device='cuda:0'):
    # Check if the specified device is a GPU and parse its index
    if device.startswith('cuda'):
        # Try to parse out the GPU index after 'cuda:'
        gpu_index = device.split(':')[-1]
        try:
            gpu_index = int(gpu_index)
            if gpu_index >= torch.cuda.device_count() or not torch.cuda.is_available():
                raise ValueError("GPU index out of range or CUDA is not available.")
        except ValueError as e:
            print(f"Specified device '{device}' is not available. Error: {e}")
            device = 'cpu'
    elif device not in ['cpu']:
        print(f"Invalid device '{device}' specified. Falling back to 'cpu'.")
        device = 'cpu'
    return device

def print_gpu_memory_usage():
    mem_allocated = torch.cuda.memory_allocated()/1024**3
    mem_reserved = torch.cuda.memory_reserved()/1024**3
    print("Allocated memory:", mem_allocated, "GB")
    print("Cached memory:", mem_reserved, "GB")
    # return mem_allocated, mem_reserved

  from .autonotebook import tqdm as notebook_tqdm


In [3]:
class WhiteBoxLLM:
    VALID_MODEL_TYPES = ["AutoModelForCausalLM"]
    VALID_UQ_METHODS = ["MSP", "MTE", "PP", "P(True)", "MCSE", "MCNSE", "PMI", "CPMI"]

    def __init__(self, model_type, use_multi_gpu=False):
        if model_type not in self.VALID_MODEL_TYPES:
            raise ValueError(f"Invalid model type. Choose from {self.VALID_MODEL_TYPES}")
        self.use_multi_gpu = use_multi_gpu
        
        self.model_type = model_type
        self.model = None
        self.pipeline=None
        self.tokenizer = None
        self.device = None
        self.dtype = torch.bfloat16
        
    def load_pretrained(self, model_path, device='cpu', dtype=torch.bfloat16):
        """
        Load a pretrained model and tokenizer.

        Args:
        model_path (str): Path to the pretrained model.
        device (str): Device to load the model on ('cpu' or 'cuda').
        dtype (torch.dtype): Data type to use for the model.
        """
        print(f"loading pretrained model from: {model_path}")
        
        self.tokenizer = AutoTokenizer.from_pretrained(model_path)
        self.dtype = dtype
        self.device = set_device(device=device)
        
        if self.use_multi_gpu:
            ### use multiple GPUs with transformers.pipeline
            start = time.time()
            self.pipeline = transformers.pipeline(
                "text-generation",
                model=model_path,
                model_kwargs={"torch_dtype": self.dtype},
                device_map="auto"
            )
            self.tokenizer = self.pipeline.tokenizer
            self.model = self.pipeline.model
            end = time.time()
            print(f"Pipeline constructed in {str(end-start)} seconds. Model is loaded from path:\n {model_path}.")
        else:
            ### load model on device
            start = time.time()
            self.model = AutoModelForCausalLM.from_pretrained(model_path).to(self.device, dtype=self.dtype)    
            print(f"loading {type(self.model)}")
            end = time.time()
            print(f"Model loaded from path: {model_path}.")
            print(f"Time: {str(end-start)} seconds")  
        
    def tokenize_input(self, prompt, mode="text", add_special_tokens=True, tokenize=True):
        """
        Tokenize the input prompt.

        Args:
        prompt (str or list): The input prompt.
        mode (str): Mode of tokenization, either 'text' or 'chat'.
        add_special_tokens (bool): Whether to add special tokens.
        tokenize (bool): If True, returns tokenized input; if False, returns token values.

        Returns:
        dict or list: Tokenized input if `tokenize` is True, otherwise list of token values.
        """
        if mode not in ["text", "chat"]:
            raise ValueError("Mode must be either 'text' or 'chat'")

        if mode == "text":
            if not isinstance(prompt, str):
                raise ValueError("Prompt must be a string for text mode.")
            
        elif mode == "chat":
            ### generation prompt will be added
            if not isinstance(prompt, list) or not all(isinstance(msg, dict) and 'role' in msg and 'content' in msg for msg in prompt):
                raise ValueError("Prompt must be a list of dictionaries with 'role' and 'content' keys for chat mode.")
            prompt = self.tokenizer.apply_chat_template(prompt, tokenize=False, add_generation_prompt=True)

        encoding = self.tokenizer(
            prompt,
            add_special_tokens=add_special_tokens,
            return_tensors='pt' if tokenize else None
        )

        if tokenize:
            return encoding  # Returns tokenized input
        else:
            return self.tokenizer.convert_ids_to_tokens(encoding.input_ids[0])  # Returns list of token values

    def generate(self, input, max_new_tokens=512, temperature=1.0, top_p=0.9, top_k=50, do_sample=False, return_type='raw'):
        """
        Generate text using the loaded model.

        Args:
            input (dict, list, or str): Tokenized input containing input_ids and attention_mask(optional), or a chat object, or plain text.
            max_new_tokens (int): Maximum number of new tokens to generate.
            temperature (float): Sampling temperature.
            top_p (float): Top-p sampling.
            top_k (int): Top-k sampling.
            do_sample (bool): Whether to use sampling; use greedy decoding otherwise.
            return_in_dict (bool): If True, return detailed information in a dictionary.
            
        Returns:
            str or dict: raw output of model.generate() or a dictionary with detailed results if return_in_dict is True.
        """
        VALID_RETURN_TYPES = ['generation_text', 'generation_dict', 'pred_logprobs', 'raw']
        
        if return_type not in VALID_RETURN_TYPES:
            print(f"Invalid return type '{return_type}' specified. Valid return types: {str(VALID_RETURN_TYPES)}. Falling back to 'generation_text'.")
            return_type = 'generation_text'

        if self.tokenizer.pad_token_id is None:
            self.tokenizer.pad_token_id = self.tokenizer.eos_token_id
        terminators = self.tokenizer.eos_token_id
        
        
        # Handling different input types
        if isinstance(input, dict):  # Input is a dict, as the direct input of model.generate()
            if 'input_ids' not in input:
                raise ValueError("The input dictionary must contain 'input_ids'.")
            tokenized_input = input
        elif isinstance(input, list):  # Assuming input is a chat object
            if not is_chat(input):
                raise ValueError("The list input must be a valid chat format.")
            tokenized_input = self.tokenize_input(input, mode='chat', add_special_tokens=True, tokenize=True)
        elif isinstance(input, str):  # Input is a plain text string
            tokenized_input = self.tokenize_input(input, mode='text', add_special_tokens=True, tokenize=True)
        else:
            raise ValueError("Input format not supported. It must be either a dict, list (chat), or string (text).")

        # set attention masks
        if 'attention_mask' not in input:
            tokenized_input['attention_mask'] = tokenized_input['input_ids'].ne(self.tokenizer.pad_token_id).long()

        input_ids = tokenized_input['input_ids'].to(self.model.device)
        attention_mask = tokenized_input['attention_mask'].to(self.model.device)

        # Model inference
        with torch.no_grad():
            output = self.model.generate(
                input_ids,
                attention_mask=attention_mask,
                max_new_tokens=max_new_tokens,
                eos_token_id=terminators,
                do_sample=do_sample,
                temperature=temperature,
                top_p=top_p,
                top_k=top_k,
                return_dict_in_generate=True,
                output_scores=True
            )
        
        if return_type=='raw':
            return output
        
        if return_type=='pred_logprobs':
            ### return the predicted log-probs of new tokens generated in a tuple
            ### apply exponential to get the predicted probability vector for each newly generated token
            ### further apply max on predicted probability vector to get the predicted probability of greedily selected token
            pred_logprobs = tuple(torch.nn.functional.log_softmax(score, dim=1) for score in output.scores)
            return pred_logprobs
        
        
        input_len = input_ids.size(1)
        
        output_ids = output.sequences[0]  
        new_ids = output_ids[input_len:]
        
        new_text = self.tokenizer.decode(new_ids, skip_special_tokens=False, clean_up_tokenization_spaces=False)
        
        if return_type=='generation_text':
            ### return the generated text
            return new_text
        
        
        input_text = self.tokenizer.decode(input_ids[0], skip_special_tokens=False, clean_up_tokenization_spaces=False)
        output_text = self.tokenizer.decode(output_ids, skip_special_tokens=False, clean_up_tokenization_spaces=False)
        input_tokens = self.tokenizer.convert_ids_to_tokens(input_ids[0])
        output_tokens = self.tokenizer.convert_ids_to_tokens(output_ids)
        new_tokens = self.tokenizer.convert_ids_to_tokens(new_ids)
           
        if return_type=='generation_dict':
            ### return the generation dict with rich information
            return {
                "paras": {
                    "encoding": {"add_special_tokens": True},
                    "generation": {
                        "max_new_tokens": max_new_tokens,
                        "temperature": temperature,
                        "top_p": top_p,
                        "top_k": top_k,
                        "do_sample": do_sample
                    },
                    "decoding": {"skip_special_tokens": False, "clean_up_tokenization_spaces": False}
                },
                "input_ids": input_ids[0].tolist(),
                "new_ids": new_ids.tolist(),
                "output_ids": output_ids.tolist(),
                "input_tokens": input_tokens,
                "new_tokens": new_tokens,
                "output_tokens": output_tokens,
                "input_text": input_text,
                "new_text": new_text,
                "output_text": output_text,
                "tokenized_input": {
                    "input_ids": input_ids[0].tolist(),
                    "attention_mask": attention_mask[0].tolist()
                }
            }
        
        return None
        




    def batch_generate():
        pass

    def generate_uncertainty_scores(self, generation_dict, pred_logprobs, uq_methods=None):
        """
        Generate uncertainty scores with the specified uncertainty quantification methods.

        Args:
        generation_dict (dict): Dictionary containing generation information.
        uq_methods (list): List of strings containing the names for the methods.

        Returns:
        dict: Uncertainty dictionary with generated uncertainty scores.
        """
        if uq_methods is None:
            uq_methods = self.VALID_UQ_METHODS

        # Check for invalid methods and remove them from uq_methods
        invalid_methods = [method for method in uq_methods if method not in self.VALID_UQ_METHODS]

        if invalid_methods:
            print(f"Invalid UQ methods: {invalid_methods}. They will be ignored.")
            uq_methods = [method for method in uq_methods if method in self.VALID_UQ_METHODS]
        print(f"Valid UQ methods to execute: {uq_methods}")

        # Create a copy of the generation_dict to uncertainty_dict
        uncertainty_dict = generation_dict.copy()

        # Iterate over uq_methods and call the corresponding method
        for method in uq_methods:
            # Call the method (dummy implementation)
            getattr(self, method.lower().replace('(', '').replace(')', ''))(uncertainty_dict, pred_logprobs)

        return uncertainty_dict


    def split_lines(self, tokens):
        token_split = []
        cur_split = []

        for token in tokens:
            cur_split.append(token)
            if '\n' in token:
                token_split.append(cur_split)
                cur_split = []

        # Append any remaining tokens that did not end with a newline
        if cur_split:
            token_split.append(cur_split)

        # print("Total tokens after split:", sum(len(split) for split in token_split))

        # Create list of lines by joining tokens and removing newlines
        line_split = [''.join(split).replace('\n', '') for split in token_split]

        return line_split, token_split
    
    def msp(self, uncertainty_dict, pred_logprobs):
        new_ids = uncertainty_dict['new_ids']
        
        # Concatenate all probability tensors into a single tensor for batch processing
        logits = torch.cat([log_prob.squeeze(0) for log_prob in pred_logprobs], dim=0)
        # Convert log probabilities to probabilities
        probabilities = torch.softmax(logits, dim=-1)
        
        # Calculate the maximum probability per token
        max_probs = probabilities.max(dim=1).values
        
        # Global MSP score
        global_prob_product = torch.prod(max_probs)
        global_msp = 1 - global_prob_product.item()
        
        if 'global_unc_score' not in uncertainty_dict:
            uncertainty_dict['global_unc_score'] = {}
        uncertainty_dict['global_unc_score'].update({'MSP': global_msp})

        # Linewise MSP scores
        line_split, token_split = self.split_lines(uncertainty_dict['new_text'])

        if 'line_unc_scores' not in uncertainty_dict:
            uncertainty_dict['line_unc_scores'] = []

        current_token_index = 0
        for i, line_tokens in enumerate(token_split):
            line_length = len(line_tokens)
            line_text = line_split[i]

            line_token_ids = new_ids[current_token_index:current_token_index + line_length]
            line_probs = max_probs[current_token_index:current_token_index + line_length]
            line_prob_product = torch.prod(line_probs)
            line_msp = 1 - line_prob_product.item()

            new_scores = {"MSP": line_msp}
            existing_line = next((item for item in uncertainty_dict['line_unc_scores'] if item[0] == line_text), None)

            if existing_line:
                existing_line[2].update(new_scores)
            else:
                uncertainty_dict['line_unc_scores'].append((line_text, new_scores))
            current_token_index += line_length

        return uncertainty_dict


    def mte(self, uncertainty_dict, pred_logprobs):
        new_ids = uncertainty_dict['new_ids']
        
        # Convert list of log probabilities tensors to a single tensor
        logits = torch.cat([log_prob.squeeze(0) for log_prob in pred_logprobs], dim=0)
        
        # Convert log probabilities to probabilities
        probabilities = torch.softmax(logits, dim=-1)

        # Calculate entropy for each token
        entropies = -(probabilities * torch.log2(probabilities)).sum(dim=-1)
        global_mte = entropies.mean().item()

        if 'global_unc_score' not in uncertainty_dict:
            uncertainty_dict['global_unc_score'] = {}
        uncertainty_dict['global_unc_score'].update({'MTE': global_mte})

        # Linewise MTE
        line_split, token_split = self.split_lines(uncertainty_dict['new_text'])
        
        if 'line_unc_scores' not in uncertainty_dict:
            uncertainty_dict['line_unc_scores'] = []

        current_token_index = 0
        for i, line_tokens in enumerate(token_split):
            line_length = len(line_tokens)
            line_text = line_split[i]

            line_token_ids = new_ids[current_token_index:current_token_index + line_length]
            line_probs = probabilities[current_token_index:current_token_index + line_length]
            token_entropies = -(line_probs * torch.log2(line_probs)).sum(dim=-1)

            line_mean_entropy = token_entropies.mean().item()
            new_scores = {"MTE": line_mean_entropy}
            existing_line = next((item for item in uncertainty_dict['line_unc_scores'] if item[0] == line_text), None)

            if existing_line:
                existing_line[2].update(new_scores)
            else:
                uncertainty_dict['line_unc_scores'].append((line_text, line_token_ids, new_scores))
            current_token_index += line_length

        return uncertainty_dict

    def pp(self, uncertainty_dict, pred_logprobs):
        pass

    def ptrue(self, uncertainty_dict, pred_logprobs):
        pass

    def mcse(self, uncertainty_dict, pred_logprobs):
        pass

    def mcnse(self, uncertainty_dict, pred_logprobs):
        pass

    def pmi(self, uncertainty_dict, pred_logprobs):
        pass

    def cpmi(self, uncertainty_dict, pred_logprobs):
        pass

    def decode(self, token_ids, skip_special_tokens=False, clean_up_tokenization_spaces=False):
        """
        Decode the token IDs into a string.

        Args:
        token_ids (list or torch.Tensor): Token IDs to decode.
        skip_special_tokens (bool): Whether to skip special tokens.
        clean_up_tokenization_spaces (bool): Whether to clean up tokenization spaces.

        Returns:
        str: Decoded string.
        """
        if isinstance(token_ids, torch.Tensor):
            token_ids = token_ids.tolist()
        return self.tokenizer.decode(token_ids, skip_special_tokens=skip_special_tokens, clean_up_tokenization_spaces=clean_up_tokenization_spaces)
    

In [4]:
from datasets import load_dataset
dataset = load_dataset("evalplus/humanevalplus")
user_message = dataset['test'][0]['prompt']
task_id = dataset['test'][0]['task_id']

system_message = '''You are a Python code generator. Generate a complete and functioning Python function based on the provided code snippet.
Ensure the function includes the original instructions in the comments, in-line comments for each line of code, and import statements for any required dependencies.
Do not include main function. Enclose your code inside a ```python``` block.'''

chat = [
    {"role": "system", "content": system_message},
    {"role": "user", "content": user_message}
    ]

llm = WhiteBoxLLM('AutoModelForCausalLM')
llm.load_pretrained("TinyLlama/TinyLlama-1.1B-Chat-v1.0")

tokenized_input = llm.tokenize_input(chat, mode='chat', add_special_tokens=True, tokenize=True)
generation_text = llm.generate(chat, max_new_tokens=512, return_type='generation_text')
generation_dict = llm.generate(chat, max_new_tokens=512, return_type='generation_dict')
pred_logprobs = llm.generate(chat, max_new_tokens=512, return_type='pred_logprobs')
raw = llm.generate(chat, max_new_tokens=512, return_type='raw')

loading pretrained model from: TinyLlama/TinyLlama-1.1B-Chat-v1.0




loading <class 'transformers.models.llama.modeling_llama.LlamaForCausalLM'>
Model loaded from path: TinyLlama/TinyLlama-1.1B-Chat-v1.0.
Time: 2.0247466564178467 seconds




In [41]:
def split_lines(token_ids):
    token_split = []
    id_split = []
    position_split = []
    cur_token_split = []
    cur_id_split = []
    cur_position = 0

    tokens = [llm.tokenizer.decode(id) for id in token_ids]

    for token, token_id in zip(tokens, token_ids):
        cur_token_split.append(token)
        cur_id_split.append(token_id)
        if '\n' in token:
            token_split.append(cur_token_split)
            id_split.append(cur_id_split)
            position_split.append(cur_position)
            cur_token_split = []
            cur_id_split = []
            cur_position = len(token_split[-1]) + cur_position

    # Append any remaining tokens that did not end with a newline
    if cur_token_split:
        token_split.append(cur_token_split)
        id_split.append(cur_id_split)
        position_split.append(cur_position)

    # Create list of lines by joining tokens
    line_split = [' '.join(split) for split in token_split]

    return line_split, token_split, id_split, position_split

tokens = generation_dict['new_text']
token_ids = generation_dict['new_ids']

line_split, token_split, id_split, position_split = split_lines(token_ids)

for i, (line, tokens, ids, pos) in enumerate(zip(line_split, token_split, id_split, position_split), 1):
    print(f"Line {i}:")
    print(f"  Text: {line}")
    print(f"  Tokens: {tokens}")
    print(f"  Token IDs: {ids}")
    print(f"  Starting Position: {pos}")
    print()

Line 1:
  Text: # Import necessary libraries 

  Tokens: ['#', 'Import', 'necessary', 'libraries', '\n']
  Token IDs: [29937, 16032, 5181, 9562, 13]
  Starting Position: 0

Line 2:
  Text: import math 

  Tokens: ['import', 'math', '\n']
  Token IDs: [5215, 5844, 13]
  Starting Position: 5

Line 3:
  Text: 

  Tokens: ['\n']
  Token IDs: [13]
  Starting Position: 8

Line 4:
  Text: # Define function to check if two numbers are close to each other 

  Tokens: ['#', 'Define', 'function', 'to', 'check', 'if', 'two', 'numbers', 'are', 'close', 'to', 'each', 'other', '\n']
  Token IDs: [29937, 22402, 740, 304, 1423, 565, 1023, 3694, 526, 3802, 304, 1269, 916, 13]
  Starting Position: 9

Line 5:
  Text: def has _ close _ elements ( numbers : List [ float ], threshold : float ) -> bool : 

  Tokens: ['def', 'has', '_', 'close', '_', 'elements', '(', 'numbers', ':', 'List', '[', 'float', '],', 'threshold', ':', 'float', ')', '->', 'bool', ':', '\n']
  Token IDs: [1753, 756, 29918, 5358, 29918,

In [42]:
def token_indices_by_line(token_ids):
    line_split, token_split,_,_ = split_lines(token_ids)
    token_indices_split =[]
    for i, split in enumerate(token_split):
        # print(token_split[i])
        start_index = sum(len(token_split[j]) for j in range(i))
        token_indices = [k for k in range(start_index, start_index + len(split))]
        token_indices_split.append(token_indices)
    return token_indices_split

tokens = generation_dict['new_text']
token_ids = generation_dict['new_ids']
token_indices_split = token_indices_by_line(token_ids)
print('token_indices_split', token_indices_split)


token_indices_split [[0, 1, 2, 3, 4], [5, 6, 7], [8], [9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22], [23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43], [44, 45, 46, 47, 48], [49, 50, 51, 52, 53, 54, 55, 56, 57], [58, 59, 60, 61, 62, 63, 64, 65], [66, 67], [68, 69, 70, 71, 72, 73, 74, 75, 76, 77], [78, 79, 80, 81, 82, 83, 84], [85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99], [100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119, 120, 121, 122, 123, 124, 125], [126, 127], [128, 129, 130, 131, 132, 133, 134, 135, 136, 137, 138, 139, 140, 141, 142, 143, 144, 145], [146, 147, 148, 149, 150, 151, 152], [153, 154, 155, 156, 157, 158, 159], [160, 161, 162, 163, 164, 165, 166], [167, 168], [169, 170, 171, 172, 173, 174, 175, 176, 177, 178, 179, 180, 181, 182, 183, 184, 185, 186, 187, 188, 189], [190, 191, 192, 193, 194, 195, 196, 197, 198, 199, 200, 201, 202, 203, 204], [205], [206, 207, 208,

In [48]:
import numpy as np

def mte(uncertainty_dict, pred_logprobs):

    new_ids = uncertainty_dict['new_ids']
    new_text = uncertainty_dict['new_text']

    # Compute the entropy for each tensor in the tuple
    entropy_list = []
    for log_prob_tensor in pred_logprobs:
        # Convert log probabilities to probabilities
        probabilities = torch.exp(log_prob_tensor)
        # Compute entropy using the probabilities and their log probabilities
        entropy = -(probabilities * log_prob_tensor).sum(dim=1)
        entropy_list.append(entropy)    

    
    all_entropies = torch.cat(entropy_list)
    global_mte = all_entropies.mean().item()

    if 'global_unc_score' not in uncertainty_dict:
        uncertainty_dict['global_unc_score'] = {}
    uncertainty_dict['global_unc_score'].update({'MTE': global_mte})

    # Split the text into lines
    line_split, token_split,_,_ = split_lines(new_ids)
    token_indices_split = token_indices_by_line(new_ids)

    if 'line_unc_scores' not in uncertainty_dict:
        uncertainty_dict['line_unc_scores'] = []
    
    for i,line_indices in enumerate(token_indices_split):
    # for line_indices in token_indices_split:
        if not line_indices:  # Skip empty lines
            continue
        # line_tokens = [new_text[i] for i in line_indices]
        # line_text = ''.join(line_tokens)
        line_text = line_split[i]
        token_entropies_line = []

        for index in line_indices:
            token_entropies_line.append(all_entropies[index])
        mte_score = np.mean(token_entropies_line)
        new_scores = {"MTE": mte_score}
        
        # Update or append line scores
        existing_line = next((item for item in uncertainty_dict['line_unc_scores'] if item[0] == line_text), None)
        if existing_line:
            existing_line[1].update(new_scores)
        else:
            uncertainty_dict['line_unc_scores'].append((line_text, new_scores))
    
    return uncertainty_dict

uncertainty_dict = generation_dict.copy()
uncertainty_dict = mte(uncertainty_dict, pred_logprobs)
print('uncertainty_dict', uncertainty_dict['global_unc_score'])
print('uncertainty_dict', uncertainty_dict['line_unc_scores'])

uncertainty_dict {'MTE': 0.45145514607429504}
uncertainty_dict [('# Import necessary libraries \n', {'MTE': 1.8605102}), ('import math \n', {'MTE': 0.60075325}), ('\n', {'MTE': 1.2995349}), ('# Define function to check if two numbers are close to each other \n', {'MTE': 0.98222786}), ('def has _ close _ elements ( numbers : List [ float ], threshold : float ) -> bool : \n', {'MTE': 0.030683257}), ('   # Initialize variables \n', {'MTE': 1.6680057}), ('   min _ distance = math . inf \n', {'MTE': 0.7685098}), ('   min _ index = - 1 \n', {'MTE': 0.6230825}), ('    \n', {'MTE': 0.5389683}), ('   # Loop through all numbers in the list \n', {'MTE': 0.73365116}), ('   for num in numbers : \n', {'MTE': 0.262511}), ('       # Calcul ate distance between num and all other numbers in the list \n', {'MTE': 0.902841}), ('       distance = math . sqrt ( sum (( x - num ) **  2 for x in numbers if x != num )) \n', {'MTE': 0.45249292}), ('        \n', {'MTE': 0.4203303}), ('       # If distance is less

In [49]:
def msp(uncertainty_dict, pred_logprobs):

    new_ids = uncertainty_dict['new_ids']
    new_text = uncertainty_dict['new_text']

    # Compute the probabilities for each tensor in the tuple
    prob_list = []
    for i, log_prob_tensor in enumerate(pred_logprobs):
        # Convert log probabilities to probabilities
        probabilities = torch.exp(log_prob_tensor)
        index = new_ids[i]
        prob_list.append(probabilities[0, index].item())    

    # Global MSP score
    total_prob = 1.0
    for prob in prob_list:
        total_prob *= prob
    global_msp = 1 - total_prob
    if 'global_unc_score' not in uncertainty_dict:
        uncertainty_dict['global_unc_score'] = {}
    uncertainty_dict['global_unc_score'].update({'MSP': global_msp})

    # Linewise MSP scores
    # Split the text into lines
    line_split, token_split,_,_ = split_lines(new_ids)
    token_indices_split = token_indices_by_line(new_ids)

    if 'line_unc_scores' not in uncertainty_dict:
        uncertainty_dict['line_unc_scores'] = []
    
    for i,line_indices in enumerate(token_indices_split):
        if not line_indices:  # Skip empty lines
            continue

        line_text = line_split[i]
        line_prob = 1.0
        for index in line_indices:
            line_prob *= prob_list[index]
        line_msp = 1 - line_prob
        new_scores = {"MSP": line_msp}

        existing_line = next((item for item in uncertainty_dict['line_unc_scores'] if item[0] == line_text), None)
        if existing_line:
            existing_line[1].update(new_scores)
        else:
            uncertainty_dict['line_unc_scores'].append((line_text, new_scores))

    return uncertainty_dict


uncertainty_dict = msp(uncertainty_dict, pred_logprobs)
print('uncertainty_dict', uncertainty_dict['global_unc_score'])
print('uncertainty_dict', uncertainty_dict['line_unc_scores'])

uncertainty_dict {'MTE': 0.45145514607429504, 'MSP': 1.0}
uncertainty_dict [('# Import necessary libraries \n', {'MTE': 1.8605102, 'MSP': 0.9631931421175698}), ('import math \n', {'MTE': 0.60075325, 'MSP': 0.4311580859271109}), ('\n', {'MTE': 1.2995349, 'MSP': 0.44248056411743164}), ('# Define function to check if two numbers are close to each other \n', {'MTE': 0.98222786, 'MSP': 0.9931773175858839}), ('def has _ close _ elements ( numbers : List [ float ], threshold : float ) -> bool : \n', {'MTE': 0.030683257, 'MSP': 0.10625167046089246}), ('   # Initialize variables \n', {'MTE': 1.6680057, 'MSP': 0.9774341491921095}), ('   min _ distance = math . inf \n', {'MTE': 0.7685098, 'MSP': 0.9306484062996937}), ('   min _ index = - 1 \n', {'MTE': 0.6230825, 'MSP': 0.9014650450721394}), ('    \n', {'MTE': 0.5389683, 'MSP': 0.5678201430834875}), ('   # Loop through all numbers in the list \n', {'MTE': 0.73365116, 'MSP': 0.955953279146527}), ('   for num in numbers : \n', {'MTE': 0.262511, 'MS

In [50]:
def pp(uncertainty_dict, pred_logprobs):

    new_ids = uncertainty_dict['new_ids']
    new_text = uncertainty_dict['new_text']

    log_prob_list = []
    for i, log_prob_tensor in enumerate(pred_logprobs):
        index = new_ids[i]
        log_prob_list.append(pred_logprobs[i][0, index].item())

    # Global PP score
    avg_log_prob = sum(log_prob_list) / len(log_prob_list)
    global_pp = 2 ** (-avg_log_prob)

    if 'global_unc_score' not in uncertainty_dict:
        uncertainty_dict['global_unc_score'] = {}
    uncertainty_dict['global_unc_score'].update({'PP': global_pp})

    # Linewise PP scores
    # Split the text into lines
    line_split, token_split,_,_ = split_lines(new_ids)
    token_indices_split = token_indices_by_line(new_ids)

    if 'line_unc_scores' not in uncertainty_dict:
        uncertainty_dict['line_unc_scores'] = []
    
    for i,line_indices in enumerate(token_indices_split):
        if not line_indices:  # Skip empty lines
            continue
        line_text = line_split[i]
        line_logprobs = [log_prob_list[i] for i in line_indices]
        avg_line_log_prob = sum(line_logprobs) / len(line_logprobs)
        line_pp =  2 ** (-avg_line_log_prob)
        new_scores = {"PP": line_pp}

        existing_line = next((item for item in uncertainty_dict['line_unc_scores'] if item[0] == line_text), None)
        if existing_line:
            existing_line[1].update(new_scores)
        else:
            uncertainty_dict['line_unc_scores'].append((line_text, new_scores))

    return uncertainty_dict


uncertainty_dict = pp(uncertainty_dict, pred_logprobs)
print('global_unc_score', uncertainty_dict['global_unc_score'])
print('line_unc_scores', uncertainty_dict['line_unc_scores'])

global_unc_score {'MTE': 0.45145514607429504, 'MSP': 1.0, 'PP': 1.1318415078826611}
line_unc_scores [('# Import necessary libraries \n', {'MTE': 1.8605102, 'MSP': 0.9631931421175698, 'PP': 1.5805363668121513}), ('import math \n', {'MTE': 0.60075325, 'MSP': 0.4311580859271109, 'PP': 1.1392235816622556}), ('\n', {'MTE': 1.2995349, 'MSP': 0.44248056411743164, 'PP': 1.4992675916039133}), ('# Define function to check if two numbers are close to each other \n', {'MTE': 0.98222786, 'MSP': 0.9931773175858839, 'PP': 1.2800943784403396}), ('def has _ close _ elements ( numbers : List [ float ], threshold : float ) -> bool : \n', {'MTE': 0.030683257, 'MSP': 0.10625167046089246, 'PP': 1.003714593403087}), ('   # Initialize variables \n', {'MTE': 1.6680057, 'MSP': 0.9774341491921095, 'PP': 1.6914534896269529}), ('   min _ distance = math . inf \n', {'MTE': 0.7685098, 'MSP': 0.9306484062996937, 'PP': 1.2281675053440744}), ('   min _ index = - 1 \n', {'MTE': 0.6230825, 'MSP': 0.9014650450721394, 'PP'

In [52]:
def generate_pTrue_prompt(base_prompt: str):
    system_message = """Answer the following question. Answer True or False without explanation:\n"""
    user_message = f"{base_prompt}"

    chat = [
        {"role":"system", "content":system_message},
        {"role":"user", "content":user_message}
        ]
    return chat


def extract_ptrue_score(generation_dict, pred_logprobs):
    # Find the token for True or False
    ptrue = 0.0
    found_token = False
    for index, (token_id) in enumerate(generation_dict['new_ids']):
        token = llm.tokenizer.decode(token_id)
        if 'True' in token:
            ptrue = pred_logprobs[index][0, token_id].item()
            found_token = True
            break
        elif 'False' in token:
            true_token_id = llm.tokenizer('True')['input_ids'][1]
            ptrue = pred_logprobs[index][0, true_token_id].item()
            found_token = True
            break
    
    # If no True or False token is found, use the first token's probability to be True
    if not found_token:
        true_token_id = llm.tokenizer('True')['input_ids'][1]
        ptrue = pred_logprobs[0][0, true_token_id].item()
    return ptrue

def ptrue(uncertainty_dict, pred_logprobs):

    input_text = uncertainty_dict['input_text']
    new_text = uncertainty_dict['new_text']
    new_ids = generation_dict['new_ids']

    verification_prompt = f"Question: {input_text}\nProposed Answer: {new_text}\nIs the content in the proposed answer correct?\n(A) True\n(B) False\nAnswer True or False without explanation:"
    verification_prompt = generate_pTrue_prompt(verification_prompt)

    global_ptrue_generation_dict = llm.generate(verification_prompt, max_new_tokens=512, return_type='generation_dict')
    global_ptrue_pred_logprobs = llm.generate(verification_prompt, max_new_tokens=512, return_type='pred_logprobs')

    global_ptrue = extract_ptrue_score(global_ptrue_generation_dict, global_ptrue_pred_logprobs)
    if 'global_unc_score' not in uncertainty_dict:
        uncertainty_dict['global_unc_score'] = {}
    uncertainty_dict['global_unc_score'].update({'pTrue': global_ptrue})

    # Linewise ptrue scores
    # Split the text into lines
    line_split, token_split,_,_ = split_lines(new_ids)

    token_indices_split = token_indices_by_line(new_ids)
    for i,line_indices in enumerate(token_indices_split):
        if not line_indices:  # Skip empty lines
            continue
        line_text = line_split[i]

        line_verif_prompt = f"Question: {input_text}\nProposed Answer: {new_text}\nIs the sentence \"{line_text}\" in the proposed answer correct?\n(A) True\n(B) False\nAnswer True or False without explanation:"
        line_verif_prompt = generate_pTrue_prompt(line_verif_prompt)

        line_ptrue_generation_dict = llm.generate(line_verif_prompt, max_new_tokens=512, return_type='generation_dict')
        line_ptrue_pred_logprobs = llm.generate(line_verif_prompt, max_new_tokens=512, return_type='pred_logprobs')

        line_ptrue = extract_ptrue_score(line_ptrue_generation_dict, line_ptrue_pred_logprobs)
        new_scores = {"pTrue": line_ptrue}

        existing_line = next((item for item in uncertainty_dict['line_unc_scores'] if item[0] == line_text), None)
        if existing_line:
            existing_line[1].update(new_scores)
        else:
            uncertainty_dict['line_unc_scores'].append((line_text, new_scores))

    return uncertainty_dict

uncertainty_dict = ptrue(uncertainty_dict, pred_logprobs)
print('global_unc_score', uncertainty_dict['global_unc_score'])
print('line_unc_scores', uncertainty_dict['line_unc_scores'])

global_unc_score {'MTE': 0.45145514607429504, 'MSP': 1.0, 'PP': 1.1318415078826611, 'pTrue': -10.095816612243652}
line_unc_scores [('# Import necessary libraries \n', {'MTE': 1.8605102, 'MSP': 0.9631931421175698, 'PP': 1.5805363668121513, 'pTrue': -10.263795852661133}), ('import math \n', {'MTE': 0.60075325, 'MSP': 0.4311580859271109, 'PP': 1.1392235816622556, 'pTrue': -10.534920692443848}), ('\n', {'MTE': 1.2995349, 'MSP': 0.44248056411743164, 'PP': 1.4992675916039133, 'pTrue': -9.946125030517578}), ('# Define function to check if two numbers are close to each other \n', {'MTE': 0.98222786, 'MSP': 0.9931773175858839, 'PP': 1.2800943784403396, 'pTrue': -10.49610424041748}), ('def has _ close _ elements ( numbers : List [ float ], threshold : float ) -> bool : \n', {'MTE': 0.030683257, 'MSP': 0.10625167046089246, 'PP': 1.003714593403087, 'pTrue': -9.940467834472656}), ('   # Initialize variables \n', {'MTE': 1.6680057, 'MSP': 0.9774341491921095, 'PP': 1.6914534896269529, 'pTrue': -10.31

In [58]:
def generate_mcse_prompt(input_text, line_responce):
    system_message = """Generate a response to the following input:\n"""
    base_prompt = input_text + '\n' + line_responce
    user_message = f"{base_prompt}"

    chat = [
        {"role":"system", "content":system_message},
        {"role":"user", "content":user_message}
        ]
    return chat

def mcse(uncertainty_dict, pred_logprobs):
    
    input_text = uncertainty_dict['input_text']
    new_ids = uncertainty_dict['new_ids']

    num_samples = 3
    total_logprob = 0
    for _ in range(num_samples):
        pred_logprobs = llm.generate(input_text, max_new_tokens=512, return_type='pred_logprobs')

        log_prob_list = []
        for i, log_prob_tensor in enumerate(pred_logprobs):
            index = token_ids[i]
            log_prob_list.append(pred_logprobs[i][0, index].item())
        
        temp_logprob = sum(log_prob_list)
        total_logprob += temp_logprob
    
    global_mcse = -total_logprob / num_samples
        
    if 'global_unc_score' not in uncertainty_dict:
        uncertainty_dict['global_unc_score'] = {}
    uncertainty_dict['global_unc_score'].update({'MCSE': global_mcse})

    # Linewise ptrue scores
    # Split the text into lines
    line_split, _,_,_ = split_lines(new_ids)

    line_responce = ''
    token_indices_split = token_indices_by_line(new_ids)
    for i,line_indices in enumerate(token_indices_split):
        if not line_indices:  # Skip empty lines
            continue
        line_text = line_split[i]

        print('line_text', line_text)

        temp_mcse_prompt = generate_mcse_prompt(input_text, line_responce)

        line_total_logprob = 0
        for _ in range(num_samples):
            
            temp_generation_dict = llm.generate(temp_mcse_prompt, max_new_tokens=512, return_type='generation_dict')
            temp_pred_logprobs = llm.generate(temp_mcse_prompt, max_new_tokens=512, return_type='pred_logprobs')
        
            temp_new_ids = temp_generation_dict['new_ids']

            temp_token_indices_split = token_indices_by_line(temp_new_ids)

            temp_log_prob_list = []
            for i, log_prob_tensor in enumerate(temp_pred_logprobs):
                index = temp_new_ids[i]
                temp_log_prob_list.append(temp_pred_logprobs[i][0, index].item())

            # we only need the first sentence in the 'new_text'
            line_logprobs = sum([temp_log_prob_list[i] for i in temp_token_indices_split[0]])
            line_total_logprob += line_logprobs
        
        line_mcse = -line_total_logprob / num_samples
        new_scores = {"MCSE": line_mcse}

        existing_line = next((item for item in uncertainty_dict['line_unc_scores'] if item[0] == line_text), None)
        if existing_line:
            existing_line[1].update(new_scores)
        else:
            uncertainty_dict['line_unc_scores'].append((line_text, new_scores))

        line_responce += line_text


    return uncertainty_dict

uncertainty_dict = mcse(uncertainty_dict, pred_logprobs)
print('global_unc_score', uncertainty_dict['global_unc_score'])
print('line_unc_scores', uncertainty_dict['line_unc_scores'])

line_text # Import necessary libraries 





line_text import math 

line_text 

line_text # Define function to check if two numbers are close to each other 



In [None]:
def mcnse(uncertainty_dict, pred_logprobs):
    
    input_text = uncertainty_dict['input_text']
    new_ids = uncertainty_dict['new_ids']

    num_samples = 3

    total_normalized_logprob = 0
    for _ in range(num_samples):
        # Generate predicted log probabilities for each token in the sequence
        pred_logprobs = llm.generate(input_text, max_new_tokens=512, return_type='pred_logprobs')

        log_prob_list = []
        sequence_length = 0  # To track the length of each generated sequence

        # Calculate log probabilities for each generated token
        for i, log_prob_tensor in enumerate(pred_logprobs):
            index = token_ids[i]  # Index of the current token in the model's vocabulary
            log_prob_list.append(log_prob_tensor[0, index].item())
            sequence_length += 1  # Increment the sequence length for each token processed

        # Sum the log probabilities for the current sequence
        temp_logprob = sum(log_prob_list)

        # Normalize the summed log probabilities by the sequence length
        if sequence_length > 0:  # Avoid division by zero
            normalized_logprob = temp_logprob / sequence_length
            total_normalized_logprob += normalized_logprob

    # Compute the Monte Carlo Normalized Sequence Entropy
    global_mcnse = -total_normalized_logprob / num_samples
        
    if 'global_unc_score' not in uncertainty_dict:
        uncertainty_dict['global_unc_score'] = {}
    uncertainty_dict['global_unc_score'].update({'MCNSE': global_mcnse})

    # Linewise ptrue scores
    # Split the text into lines
    line_split, _,_,_ = split_lines(new_ids)

    line_responce = ''
    token_indices_split = token_indices_by_line(new_ids)
    for i,line_indices in enumerate(token_indices_split):
        if not line_indices:  # Skip empty lines
            continue
        line_text = line_split[i]

        print('line_text', line_text)

        temp_mcse_prompt = generate_mcse_prompt(input_text, line_responce)

        line_total_normalized_logprob = 0
        for _ in range(num_samples):
            # Generate a generation dictionary and corresponding log probabilities
            temp_generation_dict = llm.generate(temp_mcse_prompt, max_new_tokens=512, return_type='generation_dict')
            temp_pred_logprobs = llm.generate(temp_mcse_prompt, max_new_tokens=512, return_type='pred_logprobs')
            
            # Extract new token IDs from the generation dictionary
            temp_new_ids = temp_generation_dict['new_ids']

            # Split the token IDs by line to process each line individually
            temp_token_indices_split = token_indices_by_line(temp_new_ids)

            # Initialize a list to store log probabilities
            temp_log_prob_list = []
            for i, log_prob_tensor in enumerate(temp_pred_logprobs):
                index = temp_new_ids[i]
                temp_log_prob_list.append(temp_pred_logprobs[i][0, index].item())

            # Calculate log probabilities for the first sentence in the 'new_text'
            line_logprobs = sum([temp_log_prob_list[i] for i in temp_token_indices_split[0]])
            line_length = len(temp_token_indices_split[0])  # Length of the first line

            # Normalize the log probabilities by the line length if the line length is not zero
            if line_length > 0:
                normalized_line_logprobs = line_logprobs / line_length
                line_total_normalized_logprob += normalized_line_logprobs

        # Compute the Monte Carlo Normalized Sequence Entropy for the line
        line_mcnse = -line_total_normalized_logprob / num_samples
        new_scores = {"MCNSE": line_mcnse}

        existing_line = next((item for item in uncertainty_dict['line_unc_scores'] if item[0] == line_text), None)
        if existing_line:
            existing_line[1].update(new_scores)
        else:
            uncertainty_dict['line_unc_scores'].append((line_text, new_scores))

        line_responce += line_text


    return uncertainty_dict

uncertainty_dict = mcnse(uncertainty_dict, pred_logprobs)
print('global_unc_score', uncertainty_dict['global_unc_score'])
print('line_unc_scores', uncertainty_dict['line_unc_scores'])

In [32]:
id = llm.tokenizer('True')['input_ids'][1]
text = llm.tokenizer.decode([5852])

print(id, text)

5852 True
