In [None]:
from typing import Dict, List, Literal

DatasetType = Literal["HumanEval", "MBPP", "GSM8K", "MATH", "HotpotQA", "DROP", "MMLU"]

In [None]:
# remapping from operator without actual completion of workflow logic implementations

import os, sys, json

sys.path.append(os.path.abspath("./baselines/FLORA_Bench/"))

import numpy as np

from utils.file_io import read_jsonl_file, add_jsonl_file
from sklearn.model_selection import train_test_split
from baselines.FLORA_Bench.scripts.optimize.extract_workflow import *


domain = "MathAF"
dataset = read_jsonl_file(f"benchmarks/FLORA_Bench/{domain}/dataset.jsonl")
data = np.random.choice(dataset)

WORKFLOW_TEMPLATE = """from typing import Literal
import workplace.{domain}.workflows.template.operator as operator
import workplace.{domain}.workflows.round_{round}.prompt as prompt_custom
from metagpt.provider.llm_provider_registry import create_llm_instance
from metagpt.utils.cost_manager import CostManager

DatasetType = Literal["HumanEval", "MBPP", "GSM8K", "MATH", "HotpotQA", "DROP", "MMLU"]

{graph}
"""

TEST_PROMPT = "Given a code problem and a python code solution which failed to pass test or execute, you need to analyze the reason for the failure and propose a better code solution"
TEST_OPS = """test_response = await self.test(problem={problem}, solution={solution}, entry_point={entry_point})"""
TEST_INIT = """self.test = operator.Test(self.llm)"""

ANSWER_GENERATE_PROMPT = "Think step by step and solve the problem"
ANSWER_GENERATE_OPS = """answer_response = await self.answer_generate(input={input})"""
ANSWER_GENERATE_INIT = """self.answer_generate = operator.AnswerGenerate(self.llm)"""

CUSTOM_CODE_PROMPT = "Fill CodeBlock"
CUSTOM_CODE_OPS = """code_response = await self.custom_code_generate(problem={problem}, entry_point={entry_point}, instruction={instruction})"""
CUSTOM_CODE_INIT = (
    """self.custom_code_generate = operator.CustomCodeGenerate(self.llm)"""
)

SC_PROMPT = "Several answers have been generated to a same question"
SC_OPS = """ensemble_response = await self.sc_ensemble(solutions={solutions})"""
SC_INIT = """self.sc_ensemble = operator.ScEnsemble(self.llm)"""

SC_EXT_PROMPT = "Several answers have been generated to a same question"
SC_EXT_OPS = """ensemble_response = await self.sc_ensemble(solutions={solutions}, problem={problem})"""
SC_EXT_INIT = """self.sc_ensemble = operator.ScEnsemble(self.llm)"""

PROGRAMMER_PROMPT = "You are a professional Python programmer"
PROGRAMMER_OPS = """programmer_response = await self.programmer(problem={problem}, analysis={analysis})"""
PROGRAMMER_INIT = """self.programmer = operator.Programmer(self.llm)"""


if domain not in ["CodingAF", "MathAF"]:
    AGENT_PROMPTS = [
        TEST_PROMPT,
        ANSWER_GENERATE_PROMPT,
        CUSTOM_CODE_PROMPT,
        SC_PROMPT,
        PROGRAMMER_PROMPT,
    ]

    AGENT_INIT = [
        TEST_INIT,
        ANSWER_GENERATE_INIT,
        CUSTOM_CODE_INIT,
        SC_INIT,
        PROGRAMMER_INIT,
    ]

    AGENT_OPS = [
        "test_response",
        "answer_response",
        "code_response",
        "ensemble_response",
        "programmer_response",
    ]

    OPS_CLASS = [
        "Test.py",
        "AnswerGenerate.py",
        "CustomCodeGenerate.py",
        "ScEnsembleExt.py",
        "Programmer.py",
    ]

    OPS_DESC = [
        "Tests the solution using public test cases. If the solution fails, it reflects on the errors and attempts to modify the solution. Returns True and the solution if all tests pass after modifications. Returns False and the current solution if it still fails after modifications.",
        "Generate step by step based on the input. The step by step thought process is in the field of 'thought', and the final answer is in the field of 'answer'.",
        "Generates code based on customized input and instruction.",
        "Uses self-consistency to select the solution that appears most frequently in the solution list, improve the selection to enhance the choice of the best solution.",
        "Automatically writes, executes Python code, and returns the solution based on the provided problem description and analysis. The `output` only contains the final answer. If you want to see the detailed solution process, it's recommended to retrieve the `code`.",
    ]
else:
    AGENT_PROMPTS = [
        TEST_PROMPT,
        ANSWER_GENERATE_PROMPT,
        CUSTOM_CODE_PROMPT,
        SC_EXT_PROMPT,
        PROGRAMMER_PROMPT,
    ]

    AGENT_INIT = [
        TEST_INIT,
        ANSWER_GENERATE_INIT,
        CUSTOM_CODE_INIT,
        SC_EXT_INIT,
        PROGRAMMER_INIT,
    ]

    AGENT_OPS = [
        "test_response",
        "answer_response",
        "code_response",
        "ensemble_response",
        "programmer_response",
    ]

    OPS_CLASS = [
        "Test.py",
        "AnswerGenerate.py",
        "CustomCodeGenerate.py",
        "ScEnsemble.py",
        "Programmer.py",
    ]

    OPS_DESC = [
        "Tests the solution using public test cases. If the solution fails, it reflects on the errors and attempts to modify the solution. Returns True and the solution if all tests pass after modifications. Returns False and the current solution if it still fails after modifications.",
        "Generate step by step based on the input. The step by step thought process is in the field of 'thought', and the final answer is in the field of 'answer'.",
        "Generates code based on customized input and instruction.",
        "Uses self-consistency to select the solution that appears most frequently in the solution list, improve the selection to enhance the choice of the best solution.",
        "Automatically writes, executes Python code, and returns the solution based on the provided problem description and analysis. The `output` only contains the final answer. If you want to see the detailed solution process, it's recommended to retrieve the `code`.",
    ]

In [None]:
# View 1: Individual System Prompt [x] -> original
for data in dataset:
    nodes, edges = data["nodes"], data["edge_index"]
    data['operator_nodes'] = {idx: '' for idx in nodes} # View 2: Individual Operator Code [x] -> ours
    data['full_prompts'] = '' # View 3: Global System Prompt + Operator Descriptions [x] -> ours
    with open("empty_workflow.py") as f:
        empty_graph = f.read()
    init_code = ""
    init_ops = []
    implement_code = ""
    implement_ops = ['']
    for idx, inst in nodes.items():
        op = None
        # load operator file for EACH node as in the system prompt
        if idx == "0":
            init_code = WORKFLOW_TEMPLATE.format(graph=empty_graph, round=1, domain=domain)
            if domain == "CodingAF":
                implement_code = (
                    "async def __call__(self, problem: str, entry_point: str):\n"
                )
            else:
                implement_code = "async def __call__(self, problem: str):\n"
        else:
            try:
                # predefined agents
                agent = AGENT_PROMPTS.index(inst.split(".")[0].strip())
                init = AGENT_INIT[agent]
                init_ops.append(f"\t{init}")
                ops = AGENT_OPS[agent]
                implement_ops.append(ops)
                data['full_prompts'] += f"{OPS_CLASS[agent].split('.')[0]} Agent: {OPS_DESC[agent]}\n"                
                with open("operator_files/"+OPS_CLASS[agent]) as f:
                    data['operator_nodes'][str(idx)] = f.read()                  
            except:
                # custom agents
                data['full_prompts'] += "Custom Agent: Generates anything based on customized input and instruction.\n"
                init_ops.append("\tself.custom = operator.Custom(self.llm)")
                implement_ops.append("""custom_response""")
                with open("operator_files/Custom.py") as f:
                    data['operator_nodes'][str(idx)] = f.read()

    implement_flows = {idx: '\t# Implementation of the workflow' for idx in nodes} # View 5: Individual Workflow Code [x] -> ours
    
    for idx, edge in enumerate(edges):
        inp, res = edge
        in_ops_type = implement_ops[inp].strip()
        ops_type = implement_ops[res].strip()
        
        if inp > 0:
            if ops_type == "test_response":
                implement_flows[str(res)] = '\t'+TEST_OPS.format(problem="problem", solution=f"{in_ops_type}['response']", entry_point="entry_point")
            elif ops_type == "answer_response":
                implement_flows[str(res)] = '\t'+ANSWER_GENERATE_OPS.format(input=f"{in_ops_type}['response']")
            elif ops_type == "code_response":
                implement_flows[str(res)] = '\t'+CUSTOM_CODE_OPS.format(problem="problem", entry_point="entry_point", instruction=f'"{nodes[str(res)].strip()}"')
            elif ops_type == "ensemble_response" and domain in ['MathAF', 'CodingAF']:
                implement_flows[str(res)] = '\t'+SC_EXT_OPS.format(solutions="candidates", problem="problem")
            elif ops_type == "ensemble_response":
                implement_flows[str(res)] = '\t'+SC_OPS.format(solutions="candidates")
            elif ops_type == "programmer_response":
                implement_flows[str(res)] = '\t'+PROGRAMMER_OPS.format(problem="problem", analysis=f"{in_ops_type}['response']")
            else:
                implement_flows[str(res)] = f"\t{ops_type} = await self.custom(input={in_ops_type}['response'], instruction='{nodes[str(res)].strip()}')"
        else:
            if ops_type == "test_response":
                implement_flows[str(res)] = '\t'+TEST_OPS.format(problem="problem", solution="problem", entry_point="entry_point")
            elif ops_type == "answer_response":
                implement_flows[str(res)] = '\t'+ANSWER_GENERATE_OPS.format(input="problem")
            elif ops_type == "code_response":
                implement_flows[str(res)] = '\t'+CUSTOM_CODE_OPS.format(problem="problem", entry_point="entry_point", instruction=f'"{nodes[str(res)].strip()}"')
            elif ops_type == "ensemble_response" and domain in ['MathAF', 'CodingAF']:
                implement_flows[str(res)] = '\t'+SC_EXT_OPS.format(solutions="problem", problem="problem")
            elif ops_type == "ensemble_response":
                implement_flows[str(res)] = '\t'+SC_OPS.format(solutions="problem")
            elif ops_type == "programmer_response":
                implement_flows[str(res)] = '\t'+PROGRAMMER_OPS.format(problem="problem", analysis="problem")
            else:
                implement_flows[str(res)] = f"\t{ops_type} = await self.custom(input=problem, instruction='{nodes[str(res)].strip()}')"

        if idx == len(edges) - 1:
            last_node = list(implement_flows.keys())[-1]
            ops_type = implement_flows[last_node].split("=")[0].strip()
            implement_flows[last_node] = implement_flows[last_node].replace(ops_type, "solution", 1)

    init_code += "\n".join(list(set(init_ops)))
    implement_code += "\n".join(implement_flows.values())
    workflow_code = f"""{init_code}

    {implement_code}
    \treturn solution['response'], self.llm.cost_manager.total_cost"""
    data['workflow_code'] = workflow_code # View 4: Global Workflow Code [x] -> ours
    data['code_nodes'] = implement_flows