Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 39 additions & 1 deletion adf_core_python/core/agent/info/world_info.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from typing import Any
from typing import Any, Optional

from rcrs_core.entities.entity import Entity
from rcrs_core.worldmodel.changeSet import ChangeSet
from rcrs_core.worldmodel.entityID import EntityID
from rcrs_core.worldmodel.worldmodel import WorldModel
Expand Down Expand Up @@ -35,3 +36,40 @@ def set_change_set(self, change_set: ChangeSet) -> None:
Change set
"""
self._change_set = change_set

def get_entity(self, entity_id: EntityID) -> Optional[Entity]:
"""
Get the entity

Parameters
----------
entity_id : EntityID
Entity ID

Returns
-------
Optional[Entity]
Entity
"""
return self._world_model.get_entity(entity_id)

def get_entity_ids_of_type(self, entity_type: type[Entity]) -> list[EntityID]:
"""
Get the entity IDs of the specified type

Parameters
----------
entity_type : type[Entity]
Entity type

Returns
-------
list[EntityID]
Entity IDs
"""
entity_ids: list[EntityID] = []
for entity in self._world_model.get_entities():
if isinstance(entity, entity_type):
entity_ids.append(entity.get_id())

return entity_ids
106 changes: 106 additions & 0 deletions adf_core_python/implement/extend_action/default_extend_action_move.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
from typing import Optional, cast

from rcrs_core.entities.area import Area
from rcrs_core.entities.blockade import Blockade
from rcrs_core.entities.entity import Entity
from rcrs_core.entities.human import Human
from rcrs_core.worldmodel.entityID import EntityID

from adf_core_python.core.agent.action.common.action_move import ActionMove
from adf_core_python.core.agent.communication.message_manager import MessageManager
from adf_core_python.core.agent.develop.develop_data import DevelopData
from adf_core_python.core.agent.info.agent_info import AgentInfo
from adf_core_python.core.agent.info.scenario_info import Mode, ScenarioInfo
from adf_core_python.core.agent.info.world_info import WorldInfo
from adf_core_python.core.agent.module.module_manager import ModuleManager
from adf_core_python.core.agent.precompute.precompute_data import PrecomputeData
from adf_core_python.core.component.extaction.ext_action import ExtAction
from adf_core_python.core.component.module.algorithm.path_planning import PathPlanning


class DefaultExtendActionMove(ExtAction):
def __init__(
self,
agent_info: AgentInfo,
world_info: WorldInfo,
scenario_info: ScenarioInfo,
module_manager: ModuleManager,
develop_data: DevelopData,
) -> None:
super().__init__(
agent_info, world_info, scenario_info, module_manager, develop_data
)
self._target_entity_id: Optional[EntityID] = None
self._threshold_to_rest: int = develop_data.get_value("threshold_to_rest", 100)

match self.scenario_info.get_mode():
case Mode.NON_PRECOMPUTE:
self._path_planning: PathPlanning = cast(
PathPlanning,
self.module_manager.get_module(
"DefaultExtendActionMove.PathPlanning",
"adf_core_python.implement.module.astar_path_planning.AStarPathPlanning",
),
)
case Mode.PRECOMPUTATION:
pass
case Mode.PRECOMPUTED:
pass

def precompute(self, precompute_data: PrecomputeData) -> ExtAction:
super().precompute(precompute_data)
if self.get_count_precompute() > 1:
return self
self._path_planning.precompute(precompute_data)
return self

def resume(self, precompute_data: PrecomputeData) -> ExtAction:
super().resume(precompute_data)
if self.get_count_resume() > 1:
return self
self._path_planning.resume(precompute_data)
return self

def prepare(self) -> ExtAction:
super().prepare()
if self.get_count_prepare() > 1:
return self
self._path_planning.prepare()
return self

def update_info(self, message_manager: MessageManager) -> ExtAction:
super().update_info(message_manager)
if self.get_count_update_info() > 1:
return self
self._path_planning.update_info(message_manager)
return self

def set_target_entity_id(self, target_entity_id: EntityID) -> ExtAction:
entity: Optional[Entity] = self.world_info.get_entity(target_entity_id)
self._target_entity_id = None

if entity is None:
return self

if isinstance(entity, Blockade):
entity = self.world_info.get_entity(cast(Blockade, entity).get_position())
elif isinstance(entity, Human):
entity = entity.get_position()

if entity is None and isinstance(entity, Area):
self._target_entity_id = None

return self

def calc(self) -> ExtAction:
self._result = None
agent: Human = cast(Human, self.agent_info.get_myself())

path: list[EntityID] = self._path_planning.get_path(
agent.get_position(), self._target_entity_id
)

if path is not None or len(path) != 0:
self.result = ActionMove(path)

return self
Loading