In [1]:
from modelzipper.tutils import *
from pprint import pprint
from tqdm import tqdm
from typing import List, Optional, Dict, Any, Tuple
import sys
import random
import json
import re

  from .autonotebook import tqdm as notebook_tqdm


[4m[36mModelZipper is ready for launch🚀 | Current Version🦄 >>> 0.2.7 <<< | AOE Time🕒 2024-11-20 22:30:38[0m


In [2]:
def calculate_f1(true, pred):
    true_set, pred_set = set(true), set(pred)
    tp = len(true_set & pred_set)
    fp = len(pred_set - true_set)
    fn = len(true_set - pred_set)
    
    if tp + fp > 0:
        precision = tp / (tp + fp)
    else:
        precision = 0.0
    
    if tp + fn > 0:
        recall = tp / (tp + fn)
    else:
        recall = 0.0
    
    if precision + recall > 0:
        f1 = 2 * (precision * recall) / (precision + recall)
    else:
        f1 = 0.0
        
    return precision, recall, f1

def evaluate_model_output(model_results, golden_results):
    api_name_recall = 0
    api_id_correct = 0
    param_name_precision = 0
    param_name_recall = 0
    param_name_f1 = 0
    value_precision = 0
    value_recall = 0
    value_f1 = 0
    num_samples = len(golden_results)
    
    for model, golden in zip(model_results, golden_results):
        # Check API Name Recall
        if model['api_name'] in [g['api_name'] for g in golden_results]:
            api_name_recall += 1

        # Check API ID Accuracy
        api_id_correct += int(int(model['api_id']) == golden['api_id'])

        # Check call_parameter names
        model_param_names = [list(param.keys())[0] for param in model['call_parameter']]
        golden_param_names = list(golden['call_parameter'].keys())
        
        p_precision, p_recall, p_f1 = calculate_f1(golden_param_names, model_param_names)
        param_name_precision += p_precision
        param_name_recall += p_recall
        param_name_f1 += p_f1

        # Check call_parameter values
        model_param_values = [list(param.values())[0] for param in model['call_parameter']]
        golden_param_values = list(golden['call_parameter'].values())
        
        v_precision, v_recall, v_f1 = calculate_f1(golden_param_values, model_param_values)
        value_precision += v_precision
        value_recall += v_recall
        value_f1 += v_f1

    # Calculate averages
    recall_average = api_name_recall / num_samples
    api_id_accuracy = api_id_correct / num_samples
    param_name_precision_average = param_name_precision / num_samples
    param_name_recall_average = param_name_recall / num_samples
    param_name_f1_average = param_name_f1 / num_samples
    value_precision_average = value_precision / num_samples
    value_recall_average = value_recall / num_samples
    value_f1_average = value_f1 / num_samples

    return {
        "api_name_recall": recall_average,
        "api_id_accuracy": api_id_accuracy,
        "param_name_precision": param_name_precision_average,
        "param_name_recall": param_name_recall_average,
        "param_name_f1": param_name_f1_average,
        "value_precision": value_precision_average,
        "value_recall": value_recall_average,
        "value_f1": value_f1_average
    }

def evaluate_all_samples(all_model_results, all_golden_results):
    total_api_name_recall = 0
    total_api_id_accuracy = 0
    total_param_name_precision = 0
    total_param_name_recall = 0
    total_param_name_f1 = 0
    total_value_precision = 0
    total_value_recall = 0
    total_value_f1 = 0
    num_samples = len(all_golden_results)

    for model_results, golden_results in zip(all_model_results, all_golden_results):
        # Evaluate each sample
        pred_results = model_results['pred_param']
        evaluation_result = evaluate_model_output(pred_results, golden_results)

        # Accumulate results
        total_api_name_recall += evaluation_result["api_name_recall"]
        total_api_id_accuracy += evaluation_result["api_id_accuracy"]
        total_param_name_precision += evaluation_result["param_name_precision"]
        total_param_name_recall += evaluation_result["param_name_recall"]
        total_param_name_f1 += evaluation_result["param_name_f1"]
        total_value_precision += evaluation_result["value_precision"]
        total_value_recall += evaluation_result["value_recall"]
        total_value_f1 += evaluation_result["value_f1"]

    # Calculate averages
    overall_results = {
        "api_name_recall": total_api_name_recall / num_samples,
        "api_id_accuracy": total_api_id_accuracy / num_samples,
        "param_name_precision": total_param_name_precision / num_samples,
        "param_name_recall": total_param_name_recall / num_samples,
        "param_name_f1": total_param_name_f1 / num_samples,
        "value_precision": total_value_precision / num_samples,
        "value_recall": total_value_recall / num_samples,
        "value_f1": total_value_f1 / num_samples
    }

    return overall_results

# Example usage
print("test simgle sample generated result")

model_results = {
    'plan': None,
    'pred_param': [
        {'api_id': '0', 'api_name': 'patreon', 'call_parameter': [{'username': 'username'}]},
        {'api_id': '1', 'api_name': 'minecraft', 'call_parameter': [{'username': 'username'}]}
    ]
}
    
golden_results = [
    {'api_name': 'patreon', 'call_parameter': {'username': 'username'}, 'api_id': 0},
    {'api_name': 'minecraft', 'call_parameter': {'username': 'username'}, 'api_id': 1}
]

evaluation_result = evaluate_model_output(model_results['pred_param'], golden_results)
print(evaluation_result)

print("test all sample generated results")

# Example usage with multiple samples
all_model_results = [
    model_results, 
    model_results
]

all_golden_results = [
    [{'api_name': 'patreon', 'call_parameter': {'username': 'username'}, 'api_id': 0},
     {'api_name': 'minecraft', 'call_parameter': {'username': 'username'}, 'api_id': 1}],
    # Add corresponding golden samples here
]

overall_evaluation_result = evaluate_all_samples(all_model_results, all_golden_results)
print(overall_evaluation_result)


test simgle sample generated result
{'api_name_recall': 1.0, 'api_id_accuracy': 1.0, 'param_name_precision': 1.0, 'param_name_recall': 1.0, 'param_name_f1': 1.0, 'value_precision': 1.0, 'value_recall': 1.0, 'value_f1': 1.0}
test all sample generated results
{'api_name_recall': 1.0, 'api_id_accuracy': 1.0, 'param_name_precision': 1.0, 'param_name_recall': 1.0, 'param_name_f1': 1.0, 'value_precision': 1.0, 'value_recall': 1.0, 'value_f1': 1.0}


In [3]:
def extract_parameters(s):
    try:
        # Extracting ANSWER section
        plan_match = re.search(r"<PLAN>(.*?)</PLAN>", s, re.DOTALL)
        answer_match = re.search(r"<ANSWER>(.*?)</ANSWER>", s, re.DOTALL)

        if plan_match is None or answer_match is None:
            return None

        plan = plan_match.group(1).strip().split('\n')
        answer_content = answer_match.group(1)

        # Extract API blocks using a pattern that includes parameters
        api_blocks_pattern = r"(<API_\d+>.*?</API_\d+>.*?)(?=<API_\d+>|$)"
        api_blocks = re.findall(api_blocks_pattern, answer_content, re.DOTALL)

        # Define patterns for extracting API name and parameters
        api_id_name_pattern = r"<API_(\d+)>\s*(.*?)\s*</API_\d+>"
        param_value_pattern = r"<PARAM>\s*(.*?)\s*</PARAM>\s*<VALUE>\s*(.*?)\s*</VALUE>"

        result = []
        for api_block in api_blocks:
            # Extract API id and name
            api_id_name = re.search(api_id_name_pattern, api_block)
            if api_id_name:
                api_id, api_name = api_id_name.groups()
            else:
                continue

            # Extract parameters and values associated with this API block
            params_values = re.findall(param_value_pattern, api_block)
            call_parameters = [{param: value} for param, value in params_values]

            result.append({
                "api_id": api_id,
                "api_name": api_name,
                "call_parameter": call_parameters
            })

        # Creating the final JSON structure
        return {
            "plan": plan,
            "pred_param": result
        }
    except Exception as e:
        return None

In [None]:
# multiple_api
rapid_multiple_api = "/data/zecheng/lcm_stack/dataset/processed_dataset/rapid_multiple_api/rapid_multiple_api.json"

with open(rapid_multiple_api, "r") as f:
    rapid_multiple_api = json.load(f)

preds_rapid_multiple_api = auto_read_data("/data/zecheng/lcm_stack/dataset/inference_results/llama-3_1-8B-Instruct/sft_stage_1/preds_rapid_multiple_api.jsonl")
num_buckets = len(set([item['bucket_id'] for item in preds_rapid_multiple_api]))
bucket_pred_rapid_multiple_api = dict([(f'bucket_{i}', []) for i in range(num_buckets)])

for item in preds_rapid_multiple_api:
    bucket_pred_rapid_multiple_api[item["bucket_id"]].append(item)

total_f1_scores = {'api_name': [], 'param_name': [], 'param_value': []}
total_avg_f1_scores = {'api_name': 0, 'param_name': 0, 'param_value': 0}
total_cases, num_calculate_cases = 0, 0
overall_success = True  # Track overall success across all cases

success_query, failed_query = [], []
all_model_preds, all_golden_results = [], []
next_round_inputs_collector = []
for bucket_id in bucket_pred_rapid_multiple_api:
    for cnt, output in enumerate(bucket_pred_rapid_multiple_api[bucket_id]):
        golden_query = rapid_multiple_api[bucket_id]['query'][cnt]
        golden_parameter = rapid_multiple_api[bucket_id]['call_parameters'][cnt]
        api_index_dict = rapid_multiple_api[bucket_id]['api_index_dict']
        for tmp in golden_parameter:
            golden_api_id = api_index_dict[tmp['api_name']]
            tmp['api_id'] = golden_api_id
        
        pred, query = output['pred'], output['query']
        total_cases += 1
        if golden_query == query:
            num_calculate_cases += 1
            model_preds = extract_parameters(pred)
            if model_preds is None:
                model_preds = {'plan': [], 'pred_param': []}  # Failure to extract parameters
            
            all_model_preds.append(model_preds)
            all_golden_results.append(golden_parameter)

all_res = evaluate_all_samples(all_model_preds, all_golden_results)
pprint(all_res)
print(f"Total cases: {total_cases}, Num calculated cases: {num_calculate_cases}")
print(f"Num collected next round input: {len(next_round_inputs_collector)}")