In [3]:
import pandas as pd
import re
import json


Pyarrow will become a required dependency of pandas in the next major release of pandas (pandas 3.0),
(to allow more performant data types, such as the Arrow string type, and better interoperability with other libraries)
but was not found to be installed on your system.
If this would cause problems for you,
please provide us feedback at https://github.com/pandas-dev/pandas/issues/54466
        
  import pandas as pd


In [4]:
def parse_thought_action(dict_str):
    thought_action = {}
    thought_match = re.search(r"'thought':\s*(.+?)\s*,\s*'action'", dict_str)
    action_match = re.search(r"'action':\s*(.+?)\s*}", dict_str)
    # 提取匹配的值
    thought = thought_match.group(1) if thought_match else None
    thought = thought.replace("\\","").replace("\"","").replace("\'","")
    action = action_match.group(1) if action_match else None
    action = action.replace("\\","").replace("\"","").replace("\'","")
    thought_action = {"thought":thought,"action":action}
    return thought_action

In [5]:
def to_dict(input_string):
    # 正则表达式模式
    # pattern = r"('action_type'|'element_id'|'url'|'fill_text'):\s*(<[^>]+>|\d+|'[^']+')"
    pattern = r"('action_type'|'element_id'|'url'|'fill_text'):\s*(<[^>]+>|\d+|'[^']+'|\"[^\"]+\")"
    matches = re.findall(pattern, input_string)
    extracted_fields = {}
    for match in matches:
        field_name, field_value = match
        if field_value.startswith('<') and field_value.endswith('>'):
            enum_name = field_value.split('.')[-1].strip('<> ')
            extracted_fields[field_name.strip("'")] = enum_name
        else:
            extracted_fields[field_name.strip("'")] = field_value.strip("'")
    action = ""
    if "google_search" in extracted_fields["action_type"].lower():
        action = "google_search" + "[" + extracted_fields["fill_text"] + "]"
    elif "fill_search" in extracted_fields["action_type"].lower():
        action = "fill_search" + \
            "[" + str(extracted_fields["element_id"]) + "," + \
            extracted_fields["fill_text"] + "]"
    elif "fill_form" in extracted_fields["action_type"].lower():
        action = "fill_search" + \
            "[" + str(extracted_fields["element_id"]) + "," + \
            extracted_fields["fill_text"] + "]"
    elif "goto" in extracted_fields["action_type"].lower():
        action = "goto" + "[" + extracted_fields["url"] + "]"
    elif "click" in extracted_fields["action_type"].lower():
        action = "click" + "[" + str(extracted_fields["element_id"]) + "]"
    elif "none" in extracted_fields["action_type"].lower():
        action = "None"
    return action

In [24]:
def score_rate(score):
    first,second = score.split("/")
    return float(first) / float(second)

In [7]:
def parse_step_reward(dict_str):
    score_description = {}
    score_match = re.search(r"'score':\s*(.+?)\s*,\s*'description'", dict_str)
    description_match = re.search(r"'description':\s*(.+?)\s*}", dict_str)
    score = score_match.group(1) if score_match else None
    score = score.replace("\\", "").replace("\"", "").replace("\'", "")
    description = description_match.group(1) if description_match else None
    description = description.replace(
        "\\", "").replace("\"", "").replace("\'", "")
    score_description = {"score": score, "description": description}
    return score_description


def process_step_reward(dict_str):
    if dict_str.lower() == "x":
        dict_str = {}
    elif dict_str.lower() == "finished":
        dict_str = {"score:": 10, "description": "finished"}
    else:
        dict_str = parse_step_reward(dict_str)
    return dict_str

In [8]:
def write_to_json(file_path):
    df = pd.read_csv(file_path, index_col=False)
    df = df.drop(df.columns[0], axis=1)
    df["step_index"] += 1
    df["trace_to_dict"] = df["trace"].apply(lambda x: parse_thought_action(x))
    df["action_to_str"] = df["action"].apply(lambda x: to_dict(x))
    df["score_rate"] = df["score"].apply(lambda x: score_rate(x))
    df["step_reward"] = df["step_reward"].apply(
        lambda x: process_step_reward(x))
    df["selector"] = df["selector"].fillna("None")
    df_copy = df[
        [
            "step_index",
            "trace_to_dict",
            "selector",
            "action_to_str",
            "score",
            "score_rate",
            "step_reward",
            "step url"
        ]
    ]

    def summary(x):
        dic = {
            "step_index": x["step_index"],
            "trace_description": x["trace_to_dict"] if x["trace_to_dict"] else {},
            "selector": x["selector"] if x["selector"] != "None" else "",
            "action": x["action_to_str"] if x["action_to_str"] else "",
            "task_score": x["score"],
            "task_score_rate": x["score_rate"],
            "current_reward_score_description": x["step_reward"],
            "url": x["step url"] if x["step url"] != "finished" else ""
        }
        return dic
    step_list = []
    df_copy.apply(lambda x: step_list.append(summary(x)), axis=1)
    return step_list

In [9]:
# import os
# import pandas as pd

# folder_path = './csv_results/Dom-based'
# task_list = []
# for _, filename in enumerate(os.listdir(folder_path)):
#     out_json = {}
#     task_name = filename.split("_")[1]
#     out_json["task_id"] = int(filename.split("_")[0])
#     out_json["task_name"] = task_name
#     out_json["task_status"] = filename.split("_")[-2]
#     file_path = os.path.join(folder_path, filename)
#     if os.path.isfile(file_path):
#         task_step_list = write_to_json(file_path)
#         out_json["step_list"] = task_step_list
#         task_list.append(out_json)
# print(task_list)
# task_list = sorted(task_list, key=lambda x: x['task_id'])
# if not os.path.exists("./results/Dom-based/"):
#     os.makedirs("./results/Dom-based")
# out_json_file_path = './results/Dom-based/out.json'
# with open(out_json_file_path, 'w') as json_file:
#     json.dump(task_list, json_file)

In [26]:
import json5
def read_file(file_path="./data/group1.json"):
    '''读取标签数据'''
    return_list = []
    with open(file_path,encoding='gbk') as f:
        test_data = json5.load(f)
    for task in test_data:
        task_name = task["task"]
        return_list.append(task_name)
    return return_list
task_name_list = read_file()

['Find vitamin D that are buy 1 get 1 free and new arrival in cvs', 'Confirm my vip tour at the six flags Discovery Kingdom in sixflags', 'Show me movies produced by Aaron Horvath in imdb', 'Find Airport information of Camarillo Airport, CA and check weather in flightaware', 'Find a pasta restaurant in Sydney and save it in resy', 'Find an Xbox Wireless controller rated above 4 stars in newegg', 'View the toddler collection and add one pair of the cheapest socks for a 6 months to 5 years to the wishlist in uniqlo', 'Search for hiking boots and filter the results to show only those with a waterproof rating of at least 3 stars in rei', "Locate a large store in Washington that has kids' and maternity products, also check if they have a parking lot, and see the directions of the nearest store in uniqlo", 'Compare two wireless printers that are rated above 4 stars in newegg', 'Find comedy tv shows on netflix sorted by audience score in rottentomatoes', 'Search for holiday campground in Alas

In [27]:
import os
import pandas as pd

folder_path = './csv_results/group1/Dom-based'
task_list = []
for _, filename in enumerate(os.listdir(folder_path)):
    out_json = {}
    out_json["task_id"] = int(filename.split("_")[0])
    out_json["task_name"] = task_name_list[out_json["task_id"]]
    task_status = filename.split("_")[-2]
    out_json["task_status"] = task_status
    file_path = os.path.join(folder_path, filename)
    if os.path.isfile(file_path):
        task_step_list = write_to_json(file_path)
        out_json["step_list"] = task_step_list
        task_list.append(out_json)
print(task_list)
task_list = sorted(task_list, key=lambda x: x['task_id'])
if not os.path.exists("./results/group1/Dom-based/"):
    os.makedirs("./results/group1/Dom-based")
out_json_file_path = './results/group1/Dom-based/out.json'
with open(out_json_file_path, 'w') as json_file:
    json.dump(task_list, json_file)



In [2]:
import os
def batch_rename(work_dir):
    for filename in os.listdir(work_dir):
        # 获取文件扩展名
        if len(filename.split("_")[1]) > 10:
            old_path = filename
            new_path = filename.replace(filename.split("_")[1], "")
            # 重命名文件
            os.rename(
                os.path.join(work_dir, old_path),
                os.path.join(work_dir, new_path)
            )
work_directory = './csv_results/group1/DOM-Based'  # 替换为你的工作目录
batch_rename(work_directory)