In [None]:
# input: 1. 组合出来的3w+意图，每个意图对应一个不同的子任务集
# output: 1. 每个意图对应的task_DAG_batch1目录下的json文件
# ----------------------------------------------------------------------------------------

from ant_api.utils import (
    get_default_config,
    ask_chatgpt
)
import pandas as pd
import json
import os
import concurrent.futures
import threading
from tqdm import tqdm

# 当时标注提交了10292条数据
df1 = pd.read_json(r'C:\Users\brindle\Desktop\existing_task.json', encoding='utf-8')
df2 = pd.read_json(r'C:\Users\brindle\Desktop\existing_task2.json', encoding='utf-8')
df3 = pd.read_json(r'C:\Users\brindle\Desktop\existing_task3.json', encoding='utf-8')
# print(len(df1))
# print(len(df2))
# print(len(df3))
df = pd.concat([df1, df2, df3])
# print(df['task_instruction'].tolist()[10291])
# print(df.columns)
# 标注出10125条有意义的数据...
# annotated_df = pd.read_csv('D:/data/meaningful_intents.csv')
# print(len(annotated_df))
# 将annotated_df转换为list，每个元素为一个字典
df = df.to_dict(orient='records')
# 读取all_task.json
subtasks = json.load(open('D:/data/all_task.json', 'r', encoding='utf-8'))
dic = {}
for i in subtasks:
    dic[i["function_name"]] = i
# annotated_df = annotated_df[:10]
# print(len(df))

def process_task(task_index, task):
    intent = task['task_goal']
    functions = task['functions']
    t_subtasks = []
    for j in functions:
        subtask = dic[j]
        t_subtasks.append(subtask)
    system_prompt = """You are an AI assistant that helps compose tasks from subtasks based on user intent. Your goal is to create a directed acyclic graph (DAG) of subtasks that together accomplish the user's intended task.
    For each subtask provided:
    - It has required_resources (input parameters) and produced_resources (output)
    - It maps to a Python function with the same parameter types and return types
    - An edge exists from subtask A to B if A's produced_resources contains any of B's required_resources
    - Each subtask must be used exactly once
    - It belongs to a specific application that it operates within

    Your job is to:
    1. Analyze the user's intent
    2. Examine the provided subtasks and their dependencies
    3. Create a DAG by connecting subtasks based on their resource requirements, preferring parallel branches over long sequential chains when possible
    4. Generate Python code that:
        - Instantiates the subtask templates with actual parameters
        - Calls the functions in the correct order based on dependencies
        - Properly passes produced resources as parameters

    IMPORTANT GUIDELINES:
    - Prefer parallel execution branches over long sequential chains when possible
    - Look for opportunities to execute subtasks concurrently rather than sequentially
    - Only create sequential dependencies when absolutely necessary based on resource requirements
    - Consider if subtasks can be executed independently of each other

    Your response must be a valid JSON object with EXACTLY the following format, and NO additional text or explanation:
    {
        "task_instruction": "The complete instantiated task description with concrete parameter values. For parallel subtasks, use phrases like 'meanwhile', 'at the same time', 'in parallel', 'simultaneously' to indicate they can be executed concurrently. EVERY action must explicitly state which application it is performed in (e.g. 'In Visual Studio Code, create a new file', 'In File Explorer, navigate to the folder', 'In Chrome, open the webpage')",
        "code": "The Python code that executes the composed task",
        "dag": {
            "nodes": ["469fe98c-94ca-d399-af19-025b390105ee", "a1b2c3d4-e5f6-7890-abcd-ef1234567890", ...], # Each element must be a valid UUID corresponding to subtask["id"]
            "edges": {
                "469fe98c-94ca-d399-af19-025b390105ee": ["a1b2c3d4-e5f6-7890-abcd-ef1234567890", "d4c3b2a1-6f5e-0987-dcba-0987654321fe"],
                "a1b2c3d4-e5f6-7890-abcd-ef1234567890": [],
                ...
            }
        }
    }

    Requirements:
    - The task_instruction must include all concrete parameter values, not templates
    - The code must reflect the DAG structure through function call dependencies
    - The DAG must be represented as an adjacency list where each key in "edges" is a subtask id (UUID) pointing to an array of dependent subtask ids (UUIDs)
    - The "nodes" list in the DAG must contain only valid UUID strings corresponding to each subtask's "id"
    - The task_instruction MUST ALWAYS explicitly specify which application each operation is performed in (e.g. "In Visual Studio Code, create a new file" rather than just "Create a new file")
    - Maximize parallel execution paths when possible while respecting resource dependencies
    - DO NOT include any explanatory text or comments outside the JSON structure
"""
    example_subtasks = [{'id': '469fe98c-94ca-d399-af19-025b390105ee', 'instruction_template': 'Create a markdown cell in Jupyter notebook.', 'application': 'Visual Studio Code', 'available_parameters': [{}], 'OS': 'Windows', 'required_resources': ['null'], 'produced_resources': ['markdown_cell'], 'function': 'def create_markdown_cell() -> markdown_cell:\n    pass', 'function_name': 'create_markdown_cell'}, {'id': 'f445b2b1-b13f-59f7-ace2-8905a416f421', 'instruction_template': "Select the markdown cell '{markdown_cell_num}' in Jupyter notebook, and input '{text}'.", 'application': 'Visual Studio Code', 'available_parameters': [{'markdown_cell_num': '1', 'text': 'Hello, world!'}], 'OS': 'Windows', 'required_resources': ['markdown_cell', 'text_in_mind'], 'produced_resources': ['markdown_cell', 'text_in_mind'], 'function': 'def edit_markdown_cell(markdown_cell: markdown_cell, text: text_in_mind, markdown_cell_num: str) -> tuple[markdown_cell, text_in_mind]:\n    pass', 'function_name': 'edit_markdown_cell'}, {'id': 'c54cb0d8-d115-6cff-9fbb-82919d4dcb42', 'instruction_template': "Using the file explorer, navigate to '{dir_path}' and copy '{file_name}'.", 'application': 'File Explorer', 'available_parameters': [{'dir_path': 'C:\\Users\\user\\Desktop\\image\\', 'file_name': 'dog.jpg'}], 'OS': 'Windows', 'required_resources': ['file_path'], 'produced_resources': ['file_path', 'file_in_clipborad'], 'function': 'def navigate_and_copy_file(file_path: file_path, dir_path: str, file_name: str) -> tuple[file_path, file_in_clipborad]:\n    pass', 'function_name': 'navigate_and_copy_file'}, {'id': '4c83c8ca-f47a-1d29-ddc5-0fb5ce934fe1', 'instruction_template': "Select the markdown cell '{markdown_cell_num}' in Jupyter notebook, and paste the image from the clipboard.", 'application': 'Visual Studio Code', 'available_parameters': [{'markdown_cell_num': '1'}], 'OS': 'Windows', 'required_resources': ['markdown_cell', 'img_in_clipboard'], 'produced_resources': ['markdown_cell', 'img_in_clipboard'], 'function': 'def paste_image_to_markdown_cell(markdown_cell: markdown_cell, img_in_clipboard: img_in_clipboard, markdown_cell_num: str) -> tuple[markdown_cell, img_in_clipboard]:\n    pass', 'function_name': 'paste_image_to_markdown_cell'}]
    example_response = {
        "task_instruction": "Create a markdown cell in Jupyter notebook. Then, select the markdown cell '1' and input 'Hello, world!'. Meanwhile, navigate to 'C:\\Users\\user\\Desktop\\image\\' and copy 'dog.jpg'. Finally, select the markdown cell '1' and paste the image from the clipboard.",
        "code": "def execute_task():\n    # Step 1: Create a markdown cell\n    markdown_cell = create_markdown_cell()\n\n    # Step 2: Edit the markdown cell with text\n    markdown_cell, text_in_mind = edit_markdown_cell(markdown_cell, 'Hello, world!', '1')\n\n    # Step 3: Navigate to directory and copy the image file\n    file_path, file_in_clipborad = navigate_and_copy_file('C:\\\\Users\\\\user\\\\Desktop\\\\image\\\\dog.jpg', 'C:\\\\Users\\\\user\\\\Desktop\\\\image\\\\', 'dog.jpg')\n\n    # Step 4: Paste the image into the markdown cell\n    markdown_cell, img_in_clipboard = paste_image_to_markdown_cell(markdown_cell, file_in_clipborad, '1')\n\nexecute_task()",
        "dag": {
            "nodes": [
                "469fe98c-94ca-d399-af19-025b390105ee",
                "f445b2b1-b13f-59f7-ace2-8905a416f421",
                "c54cb0d8-d115-6cff-9fbb-82919d4dcb42",
                "4c83c8ca-f47a-1d29-ddc5-0fb5ce934fe1"
            ],
            "edges": {
                "469fe98c-94ca-d399-af19-025b390105ee": [
                    "f445b2b1-b13f-59f7-ace2-8905a416f421",
                    "4c83c8ca-f47a-1d29-ddc5-0fb5ce934fe1"
                ],
                "f445b2b1-b13f-59f7-ace2-8905a416f421": [],
                "c54cb0d8-d115-6cff-9fbb-82919d4dcb42": [
                    "4c83c8ca-f47a-1d29-ddc5-0fb5ce934fe1"
                ],
                "4c83c8ca-f47a-1d29-ddc5-0fb5ce934fe1": []
            }
        }
    }
    prompt_message = [
        {
            "role": "system", "content": system_prompt
        },
        {
            "role": "user", "content": [
                {"type": "text", "text": "Intent: Create a markdown document with image"},
                {"type": "text", "text": f"Subtasks: {example_subtasks}"}
            ]
        },
        {
            "role": "assistant", "content": f"{example_response}"
        },
        {
            "role": "user", "content": [
                {"type": "text", "text": f"Intent: {intent}"},
                {"type": "text", "text": f"Subtasks: {t_subtasks}"}
            ]
        },
    ]

    param = get_default_config(model="gpt-4o")
    param["queryConditions"]["model"] = "gpt-4o"
    param["queryConditions"]["temperature"] = "0.6"
    param["queryConditions"]["messages"] = prompt_message

    while True:
        try:
            response = ask_chatgpt(param)
            # print(response)
            if response[0] == '`':
                response = response[7:-3]
            response = json.loads(response)
            break
        except Exception as e:
            # print(response)
            print(f"Error in task {task_index}: {e}")
            continue

    # Skip if DAG has no nodes
    if len(response.get("dag", {}).get("nodes", [])) == 0:
        return
        
    output_file = f"D:/data/[explicit_app]task_DAG_batch1/{task_index}.json"
    response["task_intent"] = intent
    # Save the response to a JSON file
    with open(output_file, 'w', encoding='utf-8') as f:
        json.dump(response, f, indent=4, ensure_ascii=False)
    # print(f"Saved to {output_file}")

# 使用线程池并发处理任务
with concurrent.futures.ThreadPoolExecutor(max_workers=16) as executor:
    # Get list of already processed tasks from task_DAG directory
    # 跳过已经有了的json文件
    processed_tasks = [int(f.split('.')[0]) for f in os.listdir('D:/data/[explicit_app]task_DAG_batch1') if f.endswith('.json') and f.split('.')[0].isdigit()]
    futures = {executor.submit(process_task, i, task): i for i, task in enumerate(df) if i not in processed_tasks}
    # futures = {executor.submit(process_task, i, task): i for i, task in enumerate(df)}
    for future in tqdm(concurrent.futures.as_completed(futures), total=len(futures), desc="Processing tasks"):
        pass

Processing tasks: 100%|██████████| 3/3 [00:00<?, ?it/s]
