### Expriment Code [Using GPT API]:

##### Load Dataset 读取数据集

In [1]:
import os
import json
import sys
import glob
import gzip

# load dataset function
def load_split(dataset_dir, split):
    # dataset_dir: root path of dataset
    # split: either "debug","train","val" or "test"
    # each dataset contains many episodes, which are all ".json.gz" zip file.
    split_paths = os.path.join(dataset_dir, split, "episodes", "*.json.gz")
    split_paths = sorted(glob.glob(split_paths))

    episode_list = []
    dataset = {}

    for split_path in split_paths:
        # print("Loading: {path}".format(path=split_path))

        with gzip.GzipFile(split_path, "r") as f:
            episodes = json.loads(f.read().decode("utf-8"))

            # Build a dictionary of the dataset indexed by scene, object_type
            curr_scene = None
            curr_object = None
            points = []
            scene_points = {}
            for data_point in episodes:
                if curr_object != data_point["object_type"]:
                    scene_points[curr_object] = points
                    curr_object = data_point["object_type"]
                    points = []
                if curr_scene != data_point["scene"]:
                    dataset[curr_scene] = scene_points
                    curr_scene = data_point["scene"]
                    scene_points = {}

                points.append(data_point)

            episode_list += episodes
    # reutrn type:
    #    episode_list: list of init scene and target data, for setting and val.
    #    dataset: dict of train path to nav object, for training.
    return episode_list, dataset


def load_dataset_episode(dataset_path, split_name, index):
    test_episodes, dataset = load_split(
        dataset_dir=dataset_path, split=split_name)
    # print("episodes len: %d" % len(test_episodes))
    ep = test_episodes[index]

    return ep

#### 手动执行 Agent

In [2]:
from llm_obj_nav_agent import Agent, OpType

dataset_path = "/home/nickbit/ai2thor/dataset/"
split_name = "val" # for some reason, "test" set are not provided by aithor 5.0.0

agent = Agent(action_type=OpType.EIGHT_DIR_MOVE,
              grid_size=0.2)

In [3]:
test_episodes, dataset = load_split(dataset_dir=dataset_path , split=split_name)
print("episodes len: %d" % len(test_episodes))

# ep = test_episodes[101]
# print(ep)

episodes len: 1800


In [None]:
# sub_eps = []
# sub_obj_types = []
# for i in range(len(test_episodes)):
#     ep = test_episodes[i]
#     obj_type = ep['object_type']
#     if obj_type not in sub_obj_types:
#         sub_obj_types.append(obj_type)
#         sub_eps.append(ep)
#     else:
#         if i % 50 == 1:
#             sub_eps.append(ep)

# print(len(sub_eps))

In [None]:
# 
ep = test_episodes[0]
agent.mannual_do_first_step(ep)

In [None]:
agent.mannual_do_step()

In [None]:
agent.draw_position_points()

In [None]:
event = agent.make_controller_single_action("RotateRight", 90)

In [None]:
# event = agent.make_controller_single_action("MoveAhead", 0)

In [None]:
# test teleport
import random

event = agent.make_controller_single_action("RotateRight", 0)
objs = event.metadata['objects']
random_index = random.randint(0, len(objs) - 1)
to_tele_name = objs[random_index]["objectType"] 
print(to_tele_name)
agent.make_controller_teleport_to_object(to_tele_name)
event = agent.make_controller_single_action("RotateRight", 0)



In [None]:
# event = agent.make_controller_single_action("LookDown")

In [None]:
# depth_frame = event.depth_frame
# detections = event.instance_detections2D

# for key in detections.keys():
#     print(str(key))

#### 自动执行 Agent

In [4]:
import errno

test_name = "sub_ep_test_0804_1_split1"
rec_rootpath = f"./test_records/{test_name}/"
temp_result_path = f"./exp_temp_result/{test_name}.txt"

def make_dir_by_filepath(filepath):
    try:
        os.makedirs(os.path.dirname(filepath))
    except OSError as exc: # 如果在创建目录时发生错误
        if exc.errno != errno.EEXIST: # 如果错误不是“目录已经存在”的错误
            raise


In [5]:
start_index = 0
length = 450
print(rec_rootpath)

./test_records/sub_ep_test_0804_1_split1/


In [None]:
agent.init_epsoide(test_episodes[0])
gpt_success = 0
true_positive_success = 0
rule_success = 0
exceptioon_occr = 0

In [None]:
for i in range(start_index, length):
    ep = test_episodes[i]
    # 修改日志文件目录
    try:
        rec_path = os.path.join(rec_rootpath, f"{i}_{ep['id']}")
        agent.log_file_path = os.path.join(rec_path, "gpt_chat_log.txt")
        agent.record_file_path = os.path.join(rec_path, "trajectory_record_log.json")
        if not os.path.exists(os.path.dirname(agent.log_file_path)):
            make_dir_by_filepath(agent.log_file_path)
    except Exception as e:
        print(f"<Set log file path failed! Using default filepath. Detail: {e}>")
        agent.log_file_path = os.path.join(rec_rootpath, "defalut_gpt_chat_log.txt")
        agent.record_file_path = os.path.join(rec_rootpath, "defalut_trajectory_record_log.json")
        
    # 自动执行
    result = agent.auto_do_nav_episode(ep)
    if result[2]:
        gpt_success += result[0]
        rule_success += result[1]
        if result[0] == 1 and result[1] == 1:
            true_positive_success += 1
    else:
        exceptioon_occr += 1
    
    res = {
        "Finished ep index": i,
        "SR": true_positive_success,
        "SR_GPT" : gpt_success,
        "SR_RULE" : rule_success,
        "Exception cnt" : exceptioon_occr
    }
    # 写入临时结果
    with open(temp_result_path, 'w') as f:
        f.write(json.dumps(res))

print(f"------------------RESULT------------------" )
print(f"SR = {true_positive_success / length}" )
print(f"SR_GPT = {gpt_success / length}" )
print(f"SR_RULE = {rule_success / length}" )
print(f"exception cnt = {exceptioon_occr}" )

In [None]:
import os

def count_done_in_second_last_line(dir_path):
    count = 0
    for root, dirs, files in os.walk(dir_path):
        for file in files:
            if file == 'gpt_chat_log.txt':
                filepath = os.path.join(root, file)
                with open(filepath, 'r') as f:
                    lines = f.readlines()
                    if len(lines) >= 2 and 'Done' in lines[-2]:
                        count += 1
            
    return count

path = rec_rootpath
print(f'Done: {count_done_in_second_last_line(path)}')


In [None]:
import os
import json

def count_done_and_rules_achieved(dir_path):
    count = 0
    for root, dirs, files in os.walk(dir_path):
        if 'gpt_chat_log.txt' in files and 'trajectory_record_log.txt' in files:
            gpt_log_path = os.path.join(root, 'gpt_chat_log.txt')
            trajectory_log_path = os.path.join(root, 'trajectory_record_log.txt')

            with open(gpt_log_path, 'r') as gpt_log:
                lines = gpt_log.readlines()
                gpt_condition = len(lines) >= 2 and 'Done' in lines[-2]

            with open(trajectory_log_path, 'r') as trajectory_log:
                json_list = json.load(trajectory_log)
                if json_list:
                    # print(json_list[-1])
                    trajectory_condition = json_list[-1]['rules_achieve']
            
            if gpt_condition and not trajectory_condition:
                print(gpt_log_path)

            if gpt_condition and trajectory_condition:
                count += 1
                
    return count
print(f'True Positive: {count_done_and_rules_achieved(path)}')


In [None]:
import os
import json

def count_rules_achieved(dir_path):
    count = 0
    for root, dirs, files in os.walk(dir_path):
        if 'trajectory_record_log.txt' in files:
            gpt_log_path = os.path.join(root, 'gpt_chat_log.txt')
            trajectory_log_path = os.path.join(root, 'trajectory_record_log.txt')

            trajectory_condition = False
            with open(trajectory_log_path, 'r') as trajectory_log:
                json_list = json.load(trajectory_log)
                if json_list:
                    for each in json_list:
                        if each['rules_achieve']:
                            trajectory_condition = True
                    # print(json_list[-1])
                    # trajectory_condition = json_list[-1]['rules_achieve']

            if trajectory_condition:
                count += 1
                
    return count
print(f'Positive: {count_rules_achieved(path)}')

In [None]:
# Test parsing output string
import re
msg_string = """Thought [1]: The target object, GarbageCan, is not visible in the current observation. The maximum distance of accessibility is in the Rear direction with a distance of 6.50m. However, there are many objects in that direction, and the GarbageCan is not among them. The Rear Left direction has a maximum distance of accessibility of 1.77m, and there are also many objects in that direction, but again, the GarbageCan is not among them. The other directions have limited accessibility. Therefore, the best course of action would be to move towards the Rear direction, which has the highest accessibility and a variety of objects, increasing the chances of finding the GarbageCan.

Action [1]: MoveAhead 6.50

Observation [2]:
<Direction: Front
Objects detail: [{"RemoteControl": "Middle center, 0.67m"}, {"Pot": "Middle right, 2.88m"}, {"Chair": "Middle center and Down center, 0.67m"}, {"Desk": "Middle center, 0.66m"}, {"Painting": "Middle center and Top center, 1.24m"}, {"ArmChair": "Middle right and Down right, 5.12m"}, {"Floor": "Middle center and Down center, 2.28m"}, {"Bed": "Middle right, 0.71m"}, {"BaseballBat": "Down center, 1.69m"}]
Maximum distance of accessibility: "0.50m"""

raw_msgs = msg_string.split('\n')
thought_str = ""
action_str = ""
find_flag = False
for s in raw_msgs:
    pattern_t = r'^Thought\s*\[(\d+)\]:\s*(.*)$'
    match = re.match(pattern_t, s)
    if match:
        thought_str = match.group(2)
    else:
        pattern_a = r'^Action\s*\[(\d+)\]:\s*(.*)$'
        match = re.match(pattern_a, s)
        if match:
            action_str = match.group(2)
            find_flag = True
            break
if not find_flag:
    print("[Operation]Error: receiving action output string with wrong format.")
else:
    print(f"T: {thought_str} \nA: {action_str}")

In [None]:
# Test parsing output string
import re
action_str = """Done"""
action_pattern = r"\s*(MoveTo|RotateTo)\s*(.*)\s*|^(Done)$"
action_match = re.match(action_pattern, action_str)
if action_match:
    if action_match.group(3):
        print(action_match.group(3) + "?")
        res_dict = {
            'name': action_match.group(3),
            'param': "",
            'action_type': OpType.FOLLOW_OBJ
        }
    else:
        action_name = action_match.group(1)
        print(action_match.group(2))
        param1 = action_match.group(2)
        res_dict = {
            'name': action_name,
            'param': param1,
            'action_type': OpType.FOLLOW_OBJ
        }
    print(res_dict)
else:
    print("[Output Extract]The input string does not match the ACTION pattern.")