In [None]:
### This script is used to test local planner module.

In [None]:
import sys
import os
from pathlib import Path
sys.path.append(str(Path().resolve().parent))
from simworld.communicator.unrealcv import UnrealCV
from simworld.communicator.communicator import Communicator
from simworld.llm.a2a_llm import A2ALLM
from simworld.agent.humanoid import Humanoid
from simworld.utils.vector import Vector
from simworld.map.map import Map
from simworld.config.config_loader import Config
from simworld.local_planner.local_planner import LocalPlanner

In [None]:
os.environ['OPENAI_API_KEY'] = 'grit'

In [None]:
config = Config()

In [None]:
# Initialize the communicator
communicator = Communicator(UnrealCV(ip='192.168.0.109'))

In [None]:
# Initialize the LLM model
llm = A2ALLM(url='http://125.132.149.84:59600/',model_name='google/gemma-3-4b-it', provider='openai')

In [None]:
# Initialize the map
map = Map(config)
map.initialize_map_from_file(roads_file='/Users/kangminwoo/Document/GitHub/SimWorld/output/roads.json') # use default map

In [None]:
# Initialize the humanoid agent
humanoid = Humanoid(position=Vector(1700, 1700), direction=Vector(1, 0), map=map, communicator=communicator, config=config)

In [None]:
# Initialize the local planner
local_planner = LocalPlanner(agent=humanoid, model=llm, rule_based=True)

In [None]:
communicator.spawn_object('GEN_BP_Box_1_C', '/Game/InteractableAsset/Box/BP_Interactable_Box.BP_Interactable_Box_C', (1700, -1700, 20), (0, 0, 0))

In [None]:
### Test parser
plan = 'Go to (1700, -1700) and pick up GEN_BP_Box_1_C.'
action_space = local_planner.parse(plan)

In [None]:
communicator.spawn_agent(humanoid, config['user.model_path'])

In [None]:
communicator.spawn_ue_manager(config['simworld.ue_manager_path'])

In [None]:
# Update humanoid position and direction in a separate thread
import threading
import time

def update(exit_event):
    while not exit_event.is_set():
        humanoid_ids = [humanoid.id]
        result = communicator.get_position_and_direction(humanoid_ids=humanoid_ids)
        for idx in humanoid_ids:
            pos, dir = result[('humanoid', idx)]
            humanoid.position = pos
            humanoid.direction = dir
        time.sleep(0.1)

exit_event = threading.Event()
t = threading.Thread(target=update, args=(exit_event,))
t.start()

In [None]:
### Test executor
local_planner.execute(action_space)

In [None]:
communicator.humanoid_pick_up_object(humanoid.id, 'GEN_BP_Box_1_C')

In [None]:
exit_event.set()
t.join()

In [None]:
#communicator.clear_env()

In [None]:
communicator.disconnect()