In [82]:
import re
import ast
import os
def safe_var_name(name):
    return name.replace('[', 'LeftBracket').replace(']', 'RightBracket').replace('(', 'LeftParen').replace(')', 'RightParen').replace(' ','Space').replace('+','PLUS').replace('-','Hyphen')

def unsafify_var_name(name):
    return name.replace('LeftBracket', '[').replace('RightBracket', ']').replace('LeftParen', '(').replace('RightParen', ')').replace('Space',' ').replace('PLUS','+').replace('Hyphen','-')


def parse_wfcode_to_code(wfcode):
    nodes = {}
    invokes = {}
    connects = []
    lines = wfcode.strip().split('\n')
    for line in lines:
        line = line.strip()
        if line.startswith('workflow.add_node'):
            # Parse add_node
            # Example: workflow.add_node("vaedecode_8", "VAEDecode", {})
            m = re.match(r'workflow\.add_node\("([^"]+)", "([^"]+)", (.+)\)', line)
            if m:
                node_id = m.group(1)
                node_type = m.group(2)
                params_str = m.group(3)
                # Evaluate params_str to a dict
                params_dict = ast.literal_eval(params_str)
                nodes[node_id] = {'type': node_type, 'parameters': params_dict}
        elif line.startswith('workflow.invoke_node'):
            # Parse invoke_node
            # Example: workflow.invoke_node(["latent_5"], "emptylatentimage_5")
            m = re.match(r'workflow\.invoke_node\((.+), "([^"]+)"\)', line)
            if m:
                outputs_str = m.group(1)
                node_id = m.group(2)
                outputs_list = ast.literal_eval(outputs_str)
                invokes[node_id] = outputs_list
        elif line.startswith('workflow.connect'):
            # Parse connect
            # Example: workflow.connect("clip_4", "cliptextencode_7", "clip")
            m = re.match(r'workflow\.connect\("([^"]+)", "([^"]+)", "([^"]+)"\)', line)
            if m:
                source_node_id = m.group(1)
                target_node_id = m.group(2)
                input_name = m.group(3)
                connects.append((source_node_id, target_node_id, input_name))
    # Now, build inputs for each node
    inputs = {}
    for source_node_id, target_node_id, input_name in connects:
        if target_node_id not in inputs:
            inputs[target_node_id] = {}
        inputs[target_node_id][input_name] = source_node_id
    # Now, generate code
    code_lines = []
    code_lines.append('# create nodes by instantiation')
    for node_id in nodes:
        node_type = nodes[node_id]['type']
        params = nodes[node_id]['parameters']
        # Convert params dict to string
        params_str_list = []
        for key, value in params.items():
            # For strings, we use triple quotes
            if isinstance(value, str):
                param_value_str = '"""{}"""'.format(value)
            else:
                param_value_str = repr(value)
            params_str_list.append('{}={}'.format(key, param_value_str))
        params_str = ', '.join(params_str_list)
        code_line = '{} = {}({})'.format(safe_var_name(node_id), safe_var_name(node_type), params_str)
        code_lines.append(code_line)
    code_lines.append('')
    code_lines.append('# link nodes by invocation')
    # Now, for each node that is invoked, generate the call
    # We need to keep track of the variables assigned to outputs
    for node_id in invokes:
        outputs_list = invokes[node_id]
        outputs_list = [safe_var_name(ops) for ops in outputs_list]
        if len(outputs_list) == 1:
            outputs_str = outputs_list[0]
        else:
            outputs_str = ', '.join(outputs_list)
            outputs_str = '{}'.format(outputs_str)
        # Build input arguments
        if node_id in inputs:
            input_args_list = []
            for input_name, source_node_id in inputs[node_id].items():
                input_args_list.append('{}={}'.format(safe_var_name(input_name), safe_var_name(source_node_id)))
            input_args_str = ', '.join(input_args_list)
        else:
            input_args_str = ''
        if input_args_str:
            code_line = '{} = {}({})'.format(outputs_str, safe_var_name(node_id), input_args_str)
        else:
            code_line = '{} = {}()'.format(outputs_str, safe_var_name(node_id))
        code_lines.append(code_line)
    # Join code lines
    code = '\n'.join(code_lines)
    return code

def read_md(path):
    with open(path,'r') as f:
        return f.read()

def get_node_knowledge(node_code, md_dir="./docs/node", strict=True, res=[]):
    # node_code = parse_wfcode_to_code(node_code)
    node_knowledge = ""
    node_names = re.findall(r'=\s*([a-zA-Z_][a-zA-Z_0-9\-]*)\s*\(', node_code)
    node_names = [unsafify_var_name(name).replace(" ","").replace("_","").replace("-","").replace("(","").replace(")","").lower() for name in node_names]
    # print(node_names)
    all_nodes = [(name.split(".")[0].replace(" ","").replace("-","").replace("_","").replace("(","").replace(")","").lower(), name) for name in os.listdir(md_dir)]
    # print(all_nodes)
    result = res
    for name in node_names:
        if strict:
            for node in all_nodes:
                if name == node[0]:
                    node_knowledge+="Node " + read_md(os.path.join(md_dir, node[1])) + "\n"
                    result.append(node[1].strip(".md"))
                    break
        else:
            for node in all_nodes:
                if name in node[0]:
                    node_knowledge+="Node " + read_md(os.path.join(md_dir, node[1])) + "\n"
                    break
    return f"<node knowledge>\n{node_knowledge}\n</node knowledge>\n\n", result

In [None]:
# from code
templates = os.listdir("workflow/code")
res = []
for t in templates:
    with open(f"./workflow/code/{t}","r") as f:
        code = f.read()
    # print(code)
    _, res = get_node_knowledge(code,res=res)
templates2 = os.listdir("workflow_comfybench/code")
for t in templates2:
    with open(f"./workflow_comfybench/code/{t}","r") as f:
        code = f.read()
    # print(code)
    _, res = get_node_knowledge(code,res=res)
res = set(res)
print(len(res))
print(res)

In [84]:
# from workflow
templates = os.listdir("workflow/raw")
import json
tmp = []
for t in templates:
    with open(f"./workflow/raw/{t}","r") as f:
        nodes = json.load(f)["nodes"]
    for node in nodes:
        node_name = node["type"]
        tmp.append(node["type"])
templates2 = os.listdir("workflow_comfybench/raw")  
for t in templates2:
    with open(f"./workflow_comfybench/raw/{t}","r") as f:
        nodes = json.load(f)["nodes"]
    for node in nodes:
        node_name = node["type"]
        tmp.append(node["type"])

In [85]:
all_templates = set(res) | set(tmp)
for item in os.listdir("./docs/node"):
    if item.split(".")[0] not in all_templates:
        os.remove(f"./docs/node/{item}")
        
for item in os.listdir("./docs/template"):
    if item.split(".")[0] not in all_templates:
        os.remove(f"./docs/template/{item}")


In [86]:
nodes = set([node.split(".")[0] for node in os.listdir("./docs/node")])
template = set([node.split(".")[0] for node in os.listdir("./docs/template")])
print(nodes - template)
print(template - nodes)

set()
set()
