In [None]:
import os

data_path = "../../data/tool/test_demos.json"
result_path = "../../result/self_demos"
keys_file_path = "../../utils/raw_keys.txt"

if not os.path.exists(result_path):
    os.makedirs(result_path)

suffix = "abla_wo_post_tool_gpt35"

## load dataset

In [None]:
import json

with open(data_path, 'r') as f:
    raw_data = json.load(f)
    
len(raw_data)

In [None]:
skip_list = [0] * len(raw_data)

In [None]:
raw_data[0]

In [None]:
from tqdm import tqdm
import json

data = []

for raw_item in tqdm(raw_data):
    item = {}
    item['Name'] = raw_item['Name']
    item['Description'] = raw_item['Description']
    
    item['Specification'] = ''
    item['Function_list'] = []
    for key, value in raw_item['Function_Description'].items():
        item['Specification'] += f"{key}: {value}\n"
        item['Function_list'].append(key)
    
    item['Demonstration'] = ''
    for demo in raw_item['Demonstration']:
        temp_demo_answer_list = []
        for ans in demo['Answer']:
            action = ans["Action"]
            action_input = json.loads(ans["Action_Input"])  
            formatted_input = ', '.join([f"{key}='{value}'" if isinstance(value, str) else f"{key}={value}" for key, value in action_input.items()])
            formatted_call = f"{action}({formatted_input})"
            temp_demo_answer_list.append(formatted_call)
            
        item['Demonstration'] += f"Query: {demo['Instruction']}\nFunction Calls: {temp_demo_answer_list}\n"
        
    item['Query'] = raw_item['Query']['Instruction']
    
    temp_answer_list = []
    temp_answer_dict_list = []
    for ans in raw_item['Query']['Answer']:
        action = ans["Action"]
        action_input = json.loads(ans["Action_Input"])  
        
        formatted_input = ', '.join([f"{key}='{value}'" if isinstance(value, str) else f"{key}={value}" for key, value in action_input.items()])
        dict_input = ', '.join([f"'{key}':'{value}'" if isinstance(value, str) else f"'{key}':{value}" for key, value in action_input.items()])
        
        formatted_call = f"{action}({formatted_input})"
        dict_call = f"{action}(" + "{" + f"{dict_input}" + "})"
        
        temp_answer_list.append(formatted_call)
        temp_answer_dict_list.append(dict_call)
    
    item['Answer'] = temp_answer_list
    item['AnswerDict'] = temp_answer_dict_list
    
    
    data.append(item)

In [None]:
from utils.openai import OpenAIKey, create_response_chat

openai_key = OpenAIKey(keys_file_path)
MODEL = "gpt-3.5-turbo"

In [None]:

with open("../../dataset/tool/generate_demos/demos_9_step2_result.json", 'r', encoding='utf8') as input_file:
    raw_step2_result_list = json.load(input_file)
    
len(raw_step2_result_list)

In [None]:
import random

raw_step2_result_list = [random.sample(demos, 2) for demos in raw_step2_result_list]


In [None]:
import re

step2_result_list = []

def extract_key_lines(text):
    text = text.replace('Query:\n', 'Query: ').replace('Query: \n', 'Query: ')
    text = text.replace('Function Calls:\n', 'Function Calls: ').replace('Function Calls: \n', 'Function Calls: ')
    
    lines = text.split('\n')
    lines = [line.strip() for line in lines]
    
    query_lines = [line for line in lines if re.match(r'^(query)', line, re.IGNORECASE)]
    func_lines = [line for line in lines if re.match(r'^(function call)', line, re.IGNORECASE)]
    
    query_lines = [line for line in query_lines if len(line) > 10]
    func_lines = [line for line in func_lines if len(line) > 20]
    
    if len(query_lines) != 1 or len(func_lines) != 1:
        return ''
    
    result = ''
    for query_line, func_line in zip(query_lines, func_lines):
        result += query_line + '\n'
        result += func_line + '\n'
    
    return result

for i in range(len(raw_step2_result_list)):
    demo_candidate = []
    for demo in raw_step2_result_list[i]:
        
        clean_result = extract_key_lines(demo)
        if clean_result != '':
            demo_candidate.append(clean_result)
    
    demo_candidate = list(set(demo_candidate))
    
    if len(demo_candidate) >= 1:
        step2_result_list.append(demo_candidate)
    else:
        step2_result_list.append(['None'])
        skip_list[i] = 1

In [None]:
sum(skip_list)

In [None]:
with open(os.path.join(result_path, f"{suffix}_step2_skip.json"), "w") as f:
    json.dump(skip_list, f, indent=4)

## Step 3: w/o post-processing

In [None]:
raw_step3_result_list = step2_result_list

In [None]:
with open(os.path.join(result_path, f"{suffix}_step3.json"), "w") as f:
    json.dump(raw_step3_result_list, f, indent=4)

In [None]:
import re

step3_result_list = []

for i in range(len(raw_step3_result_list)):
    if skip_list[i] == 1:
        step3_result_list.append('None')
    else: 
        result = ''
        for demo in raw_step3_result_list[i]:
            result += demo

        step3_result_list.append(result)

## Step 4: Response Generation


In [None]:
step4_template = """The {tool_name} API is used for {description}. In this task, you need to generate the function calls for a given query.

# Tool Specification:
{specification}
# Demonstration:
{seed_demonstration}{checked_demonstration}
# Instruction: Solve the following user query.
Query: {query}
Function calls: Give your answer in the format of ["function_name(parameter=value)"] here."""

In [None]:
fewshot_template = """The {tool_name} API is used for {description}. In this task, you need to generate the function calls for a given query.

# Tool Specification:
{specification}
# Demonstration:
{seed_demonstration}
# Instruction: Solve the following user query.
Query: {query}
Function calls: Give your answer in the format of ["function_name(parameter=value)"] here."""

In [None]:
prompt_list = []

for i in range(len(data)):
    if skip_list[i] == 1:
        prompt = fewshot_template.format(
            tool_name=data[i]["Name"],
            description=data[i]['Description'],
            specification=data[i]['Specification'],
            seed_demonstration=data[i]['Demonstration'],
            query=data[i]['Query']
        )
    else:
        prompt = step4_template.format(
            tool_name=data[i]["Name"],
            description=data[i]['Description'],
            specification=data[i]['Specification'],
            seed_demonstration=data[i]['Demonstration'],
            checked_demonstration=step3_result_list[i],
            query=data[i]['Query']
        )
        
    prompt_list.append(prompt)
    
print(prompt_list[0])

In [None]:
step4_result_list = []

for i in tqdm(range(len(prompt_list))):
    try_times = 0
    while try_times < 10:
        try: 
            result = create_response_chat(
                MODEL,
                prompt_input=[
                    {"role": "system", "content": "You are a helpful assistant."},
                    {"role": "user", "content": prompt_list[i]}
                ],
                max_tokens=512,
                temperature=0
            )
            # print(result)
            step4_result_list.append(result)
            break
        except Exception as e:
            try_times += 1
            if try_times == 10:
                step4_result_list.append('None')
            openai_key.process_error(e)

In [None]:
with open(os.path.join(result_path, f"{suffix}_step4.json"), "w") as f:
    json.dump(step4_result_list, f, indent=4)

## Evaluation


In [None]:
with open(os.path.join(result_path, f"{suffix}_step4.json"), 'r', encoding='utf8') as input_file:
    result_list = json.load(input_file)
print(len(result_list))

In [None]:
from utils.evaluate import evaluate_tool_exact_output, evaluate_tool_part_output

print(f"Exact Accuracy: {evaluate_tool_exact_output(result_list, data)}%")
print(f"Part Accuracy: {evaluate_tool_part_output(result_list, data)}%")