# Dataset

In [8]:
from datasets import load_dataset
relative_path_to_data = './production_eval_chat.json'
dataset = load_dataset('json', data_files={'train': relative_path_to_data}, split="train")
print(dataset[30]["text"])

Using custom data configuration default-c24bdeeb726a4b8c


Downloading and preparing dataset json/default to C:\Users\1seba\.cache\huggingface\datasets\json\default-c24bdeeb726a4b8c\0.0.0\da492aad5680612e4028e7f6ddc04b1dfcec4b64db470ed7cc5f2bb265b9b6b5...


Downloading data files:   0%|          | 0/1 [00:00<?, ?it/s]

Extracting data files:   0%|          | 0/1 [00:00<?, ?it/s]

0 tables [00:00, ? tables/s]

Failed to read file 'C:\Projects\gaitor-function-calling\evaluation\production_eval_chat.json' with error <class 'pyarrow.lib.ArrowInvalid'>: JSON parse error: Column() changed from object to array in row 0


AttributeError: 'list' object has no attribute 'keys'

# Load Models

In [None]:
from transformers import AutoTokenizer
from peft import AutoPeftModelForCausalLM
import torch

# Set the path to the checkpoint directory
hub_id = "SebastianS/function_calling-llama_7b"

# load base LLM model and tokenizer
fc_model = AutoPeftModelForCausalLM.from_pretrained(
    hub_id,
    low_cpu_mem_usage=True,
    torch_dtype=torch.float16,
    load_in_4bit=True,
)
fc_tokenizer = AutoTokenizer.from_pretrained(hub_id)


# Metric

In [None]:
import re
import json
from transformers import AutoTokenizer, AutoModel
from scipy.spatial.distance import cosine

function_calling_tokens = {
    "FUNCTIONS": {
        "start": "<FUNCTIONS>",
        "end": "</FUNCTIONS>"
    },
    "FUNCTION_CALL_NAME": {
        "start": "<FUNCTION_CALL_NAME>",
        "end": "</FUNCTION_CALL_NAME>"
    },
    "FUNCTION_CALL_ARGUMENTS": {
        "start": "<FUNCTION_CALL_ARGUMENTS>",
        "end": "</FUNCTION_CALL_ARGUMENTS>"
    },
    "all": ["<FUNCTIONS>", "</FUNCTIONS>", "<FUNCTION_CALL_NAME>", "</FUNCTION_CALL_NAME>", "<FUNCTION_CALL_ARGUMENTS>", "</FUNCTION_CALL_ARGUMENTS>"]
}

def parse_prompt_back_to_data(prompt):
    """
    Function to parse a prompt back into the original data format, using a dictionary of function calling tokens.
    
    :param prompt: A string representing the constructed prompt.
    :param function_calling_tokens: A dictionary containing the start and end tokens for different function call elements.
    :return: A dictionary representing the original data instance.
    """
    # Building regular expression patterns using the function_calling_tokens
    functions_pattern = rf"{function_calling_tokens['FUNCTIONS']['start']}(.*?){function_calling_tokens['FUNCTIONS']['end']}"
    input_pattern = r"<</SYS>>\n\n(.*?) \[/INST\]"  # This remains unchanged as it's not part of function_calling_tokens
    target_content_pattern = r"\[/INST\] (.*)</s>"  # This also remains unchanged
    function_call_name_pattern = rf"{function_calling_tokens['FUNCTION_CALL_NAME']['start']}(.*?){function_calling_tokens['FUNCTION_CALL_NAME']['end']}"
    function_call_arguments_pattern = rf"{function_calling_tokens['FUNCTION_CALL_ARGUMENTS']['start']}(.*?){function_calling_tokens['FUNCTION_CALL_ARGUMENTS']['end']}"

    # Extracting data using regular expressions
    functions_str = re.search(functions_pattern, prompt).group(1)
    input_content = re.search(input_pattern, prompt).group(1)
    target_content_match = re.search(target_content_pattern, prompt)

    # Parse functions JSON string
    functions = json.loads(functions_str)

    # Prepare the data dictionary
    data = {
        "input": [{
            "chatgptMessage": {"role": "user", "content": input_content},
            "functions": functions
        }],
        "target": {
            "chatgptMessage": {"role": "assistant"},
            "functions": functions  # Including functions in the target as well
        }
    }

    # Check if the target has a function call
    if function_calling_tokens['FUNCTION_CALL_NAME']['start'] in prompt:
        function_call_name = re.search(function_call_name_pattern, prompt).group(1)
        function_call_arguments = re.search(function_call_arguments_pattern, prompt).group(1)
        data["target"]["chatgptMessage"]["function_call"] = {
            "name": function_call_name,
            "arguments": function_call_arguments
        }
    else:
        # Handle case where regex might not find a match for target content
        if target_content_match:
            target_content = target_content_match.group(1)
            data["target"]["chatgptMessage"]["content"] = target_content

    return data

# Load a pre-trained model for sentence embedding (e.g., SBERT)
embedding_model_name = "sentence-transformers/bert-base-nli-mean-tokens"
embedding_tokenizer = AutoTokenizer.from_pretrained(embedding_model_name)
embedding_model = AutoModel.from_pretrained(embedding_model_name)

def get_sentence_embedding(sentence):
    inputs = embedding_tokenizer(sentence, return_tensors='pt', padding=True, truncation=True)
    with torch.no_grad():
        outputs = embedding_model(**inputs)

    # Mean Pooling - Take attention mask into account for correct averaging
    attention_mask = inputs['attention_mask']
    input_mask_expanded = attention_mask.unsqueeze(-1).expand(outputs.last_hidden_state.size()).float()
    sum_embeddings = torch.sum(outputs.last_hidden_state * input_mask_expanded, 1)
    sum_mask = torch.clamp(input_mask_expanded.sum(1), min=1e-9)
    mean_pooled = sum_embeddings / sum_mask

    return mean_pooled[0].numpy()

def sentence_similarity(sent1, sent2):
    embedding1 = get_sentence_embedding(sent1)
    embedding2 = get_sentence_embedding(sent2)
    return 1 - cosine(embedding1, embedding2)

def custom_metric(generated_json, expected_json):
    def compare_json(g_json, e_json, key_similarity_scores, value_similarity_scores):
        for e_key, e_value in e_json.items():
            # Check for exact key match or find the most similar key
            if e_key in g_json:
                g_key = e_key
                key_similarity_scores.append(1)
            else:
                # Compute similarity with all keys in generated_json and find the best match
                key_similarity = {gen_key: sentence_similarity(e_key, gen_key) for gen_key in g_json.keys()}
                g_key, key_sim_score = max(key_similarity.items(), key=lambda x: x[1])
                key_similarity_scores.append(key_sim_score)

            # Recursive comparison for nested objects, else compare values
            if isinstance(e_value, dict) and isinstance(g_json.get(g_key, {}), dict):
                compare_json(g_json[g_key], e_value, key_similarity_scores, value_similarity_scores)
            elif isinstance(e_value, str) and isinstance(g_json.get(g_key, ""), str):
                # Compare values only if they are strings at the root level
                value_sim_score = sentence_similarity(e_value, g_json[g_key])
                value_similarity_scores.append(value_sim_score)
            elif e_value == g_json.get(g_key, None):
                value_similarity_scores.append(1)  # Exact match for non-string root values
            else:
                value_similarity_scores.append(0)  # Non-matching root values

    key_similarity_scores = []
    value_similarity_scores = []
    compare_json(generated_json, expected_json, key_similarity_scores, value_similarity_scores)

    # Compute the average similarity scores
    avg_key_similarity = sum(key_similarity_scores) / len(key_similarity_scores) if key_similarity_scores else 0
    avg_value_similarity = sum(value_similarity_scores) / len(value_similarity_scores) if value_similarity_scores else 0

    return (avg_key_similarity + avg_value_similarity) / 2

# Iterator

In [None]:
metric_scores = []
for data in dataset:
    input_ids = fc_tokenizer(data["text"], return_tensors="pt", truncation=True).input_ids.cuda()
    outputs = fc_model.generate(input_ids=input_ids, do_sample=True, top_p=0.9,temperature=0.9)

    expected_str = data["text"]
    generated_str = fc_tokenizer.batch_decode(outputs.detach().cpu().numpy())[0]

    expected_data = parse_prompt_back_to_data(expected_str)
    generated_data = parse_prompt_back_to_data(generated_str)

    generated_arguments = json.loads(generated_data["target"]["chatgptMessage"]["function_call"]["arguments"])
    expected_arguments = json.loads(expected_data["target"]["chatgptMessage"]["function_call"]["arguments"])
    
    metric_score = custom_metric(generated_arguments, expected_arguments)
    metric_scores.append(metric_score)

    print(f"Metric score: {metric_score:.2f}")

print(f"Average metric score: {sum(metric_scores) / len(metric_scores):.2f}")
