In [None]:
import gym
import simworld_gym
import os

ue_port = int(os.getenv('UE_PORT', '2000'))
print(ue_port)

9998


In [None]:
# Define the reward_setting dictionary
reward_setting = {
    "human_collision_penalty": -1,  # Penalty for colliding with humans
    "object_collision_penalty": -0.1,  # Penalty for colliding with objects
    "action_penalty": -0.1,            # Penalty for each action (encourages shorter paths)
    "success_reward": 10.0,           # Reward for successfully reaching the goal
    "off_track_penalty": -0.01          # Penalty for going off track
}

model = "gpt-5"

# Use this reward_setting in gym.make
env = gym.make('gym_citynav/SimpleWorld-v0',
               port=ue_port,
               resolution=(720, 600),
               render_mode="rgb_array",  
               observation_type="all",  
               record_video=False,
               log_dir=f"{model}_simple", 
               reward_setting=reward_setting,
               )

INFO:__init__:230:Got connection confirm: b'connected to gym_citynav'


=>Info: using ip-port socket


In [None]:
from prompt_template import *
from agents import ReasoningAgent
import os
strip = False
full = False
depth = False
agent = ReasoningAgent(
    model=model,
    system_prompt=nav_template(full, depth, strip)
)

In [None]:
from utils import action_history_text, numpy_to_base64, split_into_strips, display_images, extract_action_dict
import cv2
import numpy as np

input_tokens = 0
output_tokens = 0
worlds = [f"map_road_20_{i}" for i in range(52, 75)]
task_2_test = ["task_dist_21_0_1"]
action_mapping = ["Move_Forward", "Rotate_Left", "Rotate_Right", "Move_Left", "Move_Right", "Subtask_Complete"]
for world in worlds:
    reseted = False
    for task in task_2_test:
        task_path = os.path.join("single_agent_world", "easy", world, task)
        world_json = os.path.join(task_path, "progen_world.json")
        agent_json = os.path.join(task_path, "task_config.json")
        if not reseted:
            options = {
                "task_path": task_path,
                "agent_json": agent_json,
                "world_json": world_json,
            }
            reseted = True
        else:
            options = {
                "task_path": task_path,
                "agent_json": agent_json,
            }
        observation, info = env.reset(options=options)
        vision_cue = info["current_instruction"]["image"]
        instruction = info["current_instruction"]["text"]
        action_history = []
        chosen_actions = []
        
        summary = """
                Status: Ready to Start. 
                No obstacles found. 
                No intersections seen. 
                The Landmark to be spotted: Just started, unknown.
                Have not seen the landmark yet.
                """
        last_vision_descriptions = ""
        folder_path = os.path.join("/SimWorld", "agent_log", f"{model}_simple", f"{world}", f"{task}")
        os.makedirs(folder_path , exist_ok=True)

        i = 0
        terminated = False
        last_position = None
        current_position = None
        parse_failure_count = 0

        while not terminated:
            orientation = info['agent']['agent_rotation']
            current_position = info['agent']['agent_location']
            forward_count = 0
            for a in chosen_actions:
                if a == 0:
                    forward_count += 1
            if not last_position is None and forward_count > 3 and np.linalg.norm(np.array(current_position) - np.array(last_position)) < 0.5:
                print("Stuck in place, terminating.")
                with open(os.path.join(folder_path, "LLM_Output.txt"), "a") as llm_logger:
                    llm_logger.write("[STUCK]"'\n')
                break
            last_position = current_position
            cv2.imwrite(os.path.join(folder_path, f"{i}.png"), observation["rgb"])
            segment_img = numpy_to_base64(observation["object_mask"])
            if strip:
                input_imgs = split_into_strips(observation["rgb"])
                input_imgs.append(numpy_to_base64(vision_cue))
                descriptions = ['The view on the left', 'The horizontal center', 'The right', 'The expected view']
            else:
                input_imgs = [numpy_to_base64(observation["rgb"]), numpy_to_base64(vision_cue)]
                descriptions = ['The current view', 'The expected view']
            input_imgs.append(segment_img)
            descriptions.append('The object segmentation mask of the current view')

            display_images(observation["rgb"], vision_cue)

            nav_instance = (
                f"Actions taken: {action_history_text(action_history, action_mapping)}\n\n"
                f"Your Last Move: {chosen_actions}\n\n"
                f"Vision Description Last Step: {last_vision_descriptions}\n\n"
                f"Status of Last Step: {summary}\n\n"
                f"Current Subtask: {instruction}\n\n"
                f"Current Orientation: {orientation}\n\n"
            )
            result = agent.act(
                nav_instance, 
                input_imgs, 
                descriptions, 
            )
            result_dict = extract_action_dict(result["output"])
            try:
                last_vision_descriptions = result_dict.get("Description")
                summary = result_dict.get("Summary")
                match = result_dict.get("Match")
                chosen_actions = result_dict.get("Actions")
            except Exception as e:
                print(f"Perception error")
                parse_failure_count += 1
                if parse_failure_count > 10:
                    print("Too many perception errors, terminating.")
                    with open(os.path.join(folder_path, "LLM_Output.txt"), "a") as llm_logger:
                        llm_logger.write("[TOO MANY ERRORS]"'\n')
                    break
                continue
            print("[vision]", last_vision_descriptions)
            print("[reason]", result["reason"])
            input_tokens += result["usage"]["input"]
            output_tokens += result["usage"]["output"]
            with open(os.path.join(folder_path, "LLM_Output.txt"), "a") as llm_logger:
                llm_logger.write("[current subtask]" + instruction +'\n')
                llm_logger.write(f"[vision {i}]" + str(last_vision_descriptions).replace("\n", "") +'\n')
                llm_logger.write(f"[reason {i}]" + str(result["reason"]).replace("\n", "") + '\n')
                llm_logger.write(f"[summary {i}]" + str(summary).replace("\n", "") + '\n')
                llm_logger.write(f"[actions {i}]" + str(chosen_actions) + '\n')
                llm_logger.write(f"[match {i}]" + str(match) + '\n')
            if not chosen_actions:
                print("No action specified. Failing.")
                break
            for chosen_action in chosen_actions:
                action_history.append(chosen_action)
                i += 1
                match chosen_action:
                    case -1:
                        observation, _, terminated, _, info = env.step(-1)
                        if terminated:
                            print("End")
                            break
                        instruction = info["current_instruction"]["text"]
                        vision_cue = info["current_instruction"]["image"]
                        action_history = []
                    case 0:
                        observation, _, terminated, _, info = env.step(0)
                    case 1:
                        observation, _, terminated, _, info = env.step(5)
                    case 2:
                        observation, _, terminated, _, info = env.step(4)
                    case 3:
                        observation, _, terminated, _, info = env.step(2)
                    case 4:
                        observation, _, terminated, _, info = env.step(3)
            if terminated:
                print("End")
                break
            

In [None]:
env.close()