Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Indexing #9

Merged
merged 6 commits into from Apr 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
85 changes: 53 additions & 32 deletions clearpath_config/common.py
Expand Up @@ -350,6 +350,45 @@ def assert_valid_link(self, link: str) -> None:
)


class IndexedAccessory(Accessory):

def __init__(
self,
idx: int = None,
name: str = None,
parent: str = Accessory.PARENT,
xyz: List[float] = Accessory.XYZ,
rpy: List[float] = Accessory.RPY
) -> None:
if name is None:
name = self.get_name_from_idx(0)
super().__init__(
name,
parent,
xyz,
rpy
)
# Index:
# - index of sensor
# - used to modify parameters to allow for multiple instances
# of the same sensor.
self.idx = 0
if idx is not None:
self.set_idx(idx)

@classmethod
def get_name_from_idx(idx):
return "accessory_%s" % idx

def get_idx(self) -> str:
return self.idx

def set_idx(self, idx: int) -> None:
assert isinstance(idx, int), "Index must be an integer"
assert idx >= 0, "Index must be a positive integer"
self.name = self.get_name_from_idx(idx)


# ListConfigs: Generic Types
T = TypeVar("T")
U = TypeVar("U")
Expand Down Expand Up @@ -474,28 +513,21 @@ def uid_level_row(T) -> tuple:


# OrderedListConfig
# - uid is an integer
# - index will be enforced to match uid
# - obj_to_idx
# - idx_to_obj
# - T must have the following methods:
# - get_idx(): return an index from members
# - set_idx(idx): set an index and update members
class OrderedListConfig(Generic[T]):

def __init__(
self,
# Unique identifier to index
obj_to_idx: Callable,
idx_to_obj: Callable
) -> None:
def __init__(self, start_idx: int = 0) -> None:
self.start_idx = start_idx
self.__list: List[T] = []
self.__obj_to_idx: Callable = obj_to_idx
self.__idx_to_obj: Callable = idx_to_obj

def find(
self,
obj: T | int
) -> int:
if isinstance(obj, self.__orig_class__.__args__[0]):
idx = self.__obj_to_idx(obj)
idx = obj.get_idx()
elif isinstance(obj, int):
idx = obj
else:
Expand All @@ -504,15 +536,16 @@ def find(
self.__orig_class__.__args__[0], int
)
)
if idx < len(self.__list):
if self.start_idx <= idx < len(self.__list) + self.start_idx:
return idx
else:
return None

def update(self):
for idx, obj in enumerate(self.__list):
if self.__obj_to_idx(obj) != idx:
self.__list[idx] = self.__idx_to_obj(obj, idx)
for raw_idx, obj in enumerate(self.__list):
idx = raw_idx + self.start_idx
if obj.get_idx() != idx:
self.__list[raw_idx].set_idx(idx)

def add(
self,
Expand All @@ -532,7 +565,7 @@ def replace(
assert idx is not None, (
"Object not found. Cannot be replaced"
)
self.__list[idx] = obj
self.__list[idx - self.start_idx] = obj
self.update()

def remove(
Expand All @@ -541,15 +574,15 @@ def remove(
) -> None:
idx = self.find(obj)
if idx is not None:
self.__list.remove(self.__list[idx])
self.__list.remove(self.__list[idx - self.start_idx])
self.update()

def get(
self,
obj: T | int,
) -> T:
idx = self.find(obj)
return None if idx is None else self.__list[idx]
return None if idx is None else self.__list[idx - self.start_idx]

def get_all(
self
Expand Down Expand Up @@ -583,15 +616,3 @@ def set_all(
except AssertionError:
self.__list = tmp_list
self.update()

# Name as Unique ID to Index
@staticmethod
def name_obj_to_idx(obj: T):
name = obj.get_name()
str_idx = name.split("_")[-1]
return int(str_idx)

@staticmethod
def name_idx_to_obj(obj: T, idx: int):
obj.set_name(obj.get_name_from_idx(idx))
return obj
9 changes: 5 additions & 4 deletions clearpath_config/mounts/base.py
@@ -1,18 +1,19 @@
from clearpath_config.common import Accessory
from clearpath_config.common import Accessory, IndexedAccessory
from typing import List


class BaseMount(Accessory):
class BaseMount(IndexedAccessory):
MOUNT_MODEL = "base_mount"

def __init__(
self,
name: str = "base_mount_0",
idx: int = None,
name: str = None,
parent: str = Accessory.PARENT,
xyz: List[float] = Accessory.XYZ,
rpy: List[float] = Accessory.RPY,
) -> None:
super().__init__(name, parent, xyz, rpy)
super().__init__(idx, name, parent, xyz, rpy)

@classmethod
def get_mount_model(cls) -> str:
Expand Down
8 changes: 4 additions & 4 deletions clearpath_config/mounts/fath_pivot.py
Expand Up @@ -18,10 +18,10 @@ def __init__(
rpy: List[float] = Accessory.RPY,
) -> None:
super().__init__(
FathPivot.get_name_from_idx(0),
parent,
xyz,
rpy)
name=FathPivot.get_name_from_idx(0),
parent=parent,
xyz=xyz,
rpy=rpy)
self.angle = 0.0
if angle:
self.set_angle(angle)
Expand Down
8 changes: 4 additions & 4 deletions clearpath_config/mounts/flir_ptu.py
Expand Up @@ -29,10 +29,10 @@ def __init__(
limits_enabled: bool = LIMITS_ENABLED,
) -> None:
super().__init__(
FlirPTU.get_name_from_idx(0),
parent,
xyz,
rpy,
name=FlirPTU.get_name_from_idx(0),
parent=parent,
xyz=xyz,
rpy=rpy,
)
# Serial Port
self.tty_port = File(self.TTY_PORT)
Expand Down
28 changes: 4 additions & 24 deletions clearpath_config/mounts/mounts.py
Expand Up @@ -32,33 +32,13 @@ def __new__(cls, model: str) -> BaseMount:
class MountsConfig:
def __init__(self) -> None:
# Fath Pivot
self.__fath_pivots = (
OrderedListConfig[FathPivot](
OrderedListConfig.name_obj_to_idx,
OrderedListConfig.name_idx_to_obj
)
)
self.__fath_pivots = OrderedListConfig[FathPivot]()
# Flir PTU
self.__flir_ptus = (
OrderedListConfig[FlirPTU](
OrderedListConfig.name_obj_to_idx,
OrderedListConfig.name_idx_to_obj
)
)
self.__flir_ptus = OrderedListConfig[FlirPTU]()
# PACS Riser
self.__pacs_risers = (
OrderedListConfig[PACS.Riser](
OrderedListConfig.name_obj_to_idx,
OrderedListConfig.name_idx_to_obj
)
)
self.__pacs_risers = OrderedListConfig[PACS.Riser]()
# PACS Brackets
self.__pacs_brackets = (
OrderedListConfig[PACS.Bracket](
OrderedListConfig.name_obj_to_idx,
OrderedListConfig.name_idx_to_obj
)
)
self.__pacs_brackets = OrderedListConfig[PACS.Bracket]()

# Get All Mounts
def get_all_mounts(self) -> List[BaseMount]:
Expand Down
10 changes: 8 additions & 2 deletions clearpath_config/parser.py
Expand Up @@ -185,7 +185,9 @@ def __new__(cls, config: dict) -> A200DecorationsConfig:
dcnparser = DecorationsConfigParser
# Decorations
decorations = (
dcnparser.get_required_val(dcnparser.DECORATIONS, config))
dcnparser.get_optional_val(dcnparser.DECORATIONS, config))
if decorations is None:
return dcnconfig
# Decorations.Front_Bumper
dcnconfig.set_bumper(
BumperConfigParser(cls.FRONT_BUMPER, decorations))
Expand All @@ -207,7 +209,9 @@ def __new__(cls, config: dict) -> J100DecorationsConfig:
dcnparser = DecorationsConfigParser
# Decorations
decorations = (
dcnparser.get_required_val(dcnparser.DECORATIONS, config))
dcnparser.get_optional_val(dcnparser.DECORATIONS, config))
if decorations is None:
return dcnconfig
# Decorations.Front_Bumper
dcnconfig.set_bumper(
BumperConfigParser(cls.FRONT_BUMPER, decorations))
Expand Down Expand Up @@ -443,6 +447,8 @@ def __new__(cls, config: dict) -> MountsConfig:
mntconfig = MountsConfig()
# Mounts
mounts = cls.get_optional_val(cls.MOUNTS, config)
if mounts is None:
return mntconfig
mntconfig.set_fath_pivots(cls.get_mounts(mounts, Mount.FATH_PIVOT))
mntconfig.set_flir_ptus(cls.get_mounts(mounts, Mount.FLIR_PTU))
mntconfig.set_risers(cls.get_mounts(mounts, Mount.PACS_RISER))
Expand Down
14 changes: 14 additions & 0 deletions clearpath_config/sample/a200_config_empty.yaml
@@ -0,0 +1,14 @@
version: 0

system: # These are system level configs
self: A200-1
hosts: # These are the hosts that are involved in this system
platform: # The main computer for this system, ie the robot's computer
A200-1: 192.168.131.1
ros2:
namespace: HOSTNAME
domain_id: 123
rmw_implementation: rmw_fastrtps_cpp

platform: # These are are parameters specific to a a platform
serial_number: cpr-a200-0001
34 changes: 25 additions & 9 deletions clearpath_config/sensors/base.py
@@ -1,30 +1,30 @@
from clearpath_config.common import Accessory
from clearpath_config.common import Accessory, IndexedAccessory
from typing import List


class BaseSensor(Accessory):
class BaseSensor(IndexedAccessory):
"""
Base Sensor Class
- inherits from Accessory.
- contains all common parameters shared by all sensors.
"""
SENSOR_TYPE = "base_sensor"
NAME = SENSOR_TYPE + "_0"
TOPIC = ""
SENSOR_TYPE = "generic"
SENSOR_MODEL = "base"
TOPIC = "base"
URDF_ENABLED = True
LAUNCH_ENABLED = True

def __init__(
self,
name: str = NAME,
idx: int = None,
name: str = None,
topic: str = TOPIC,
urdf_enabled: bool = URDF_ENABLED,
launch_enabled: bool = LAUNCH_ENABLED,
parent: str = Accessory.PARENT,
xyz: List[float] = Accessory.XYZ,
rpy: List[float] = Accessory.RPY,
) -> None:
super().__init__(name, parent, xyz, rpy)
# Topic:
# - should be automatically determined by the sensor's index
# - should match the Clearpath API
Expand All @@ -38,18 +38,34 @@ def __init__(
# - enables the sensor launch in the generated launch
self.launch_enabled = True
self.enable_launch if launch_enabled else self.disable_launch()
super().__init__(idx, name, parent, xyz, rpy)

@classmethod
def get_sensor_model(cls) -> str:
def get_sensor_type(cls) -> str:
return cls.SENSOR_TYPE

@classmethod
def get_sensor_model(cls) -> str:
return cls.SENSOR_MODEL

@classmethod
def get_name_from_idx(cls, idx: int) -> str:
return "%s_%s" % (
cls.get_sensor_model(),
cls.get_sensor_type(),
idx
)

@classmethod
def get_topic_from_idx(cls, idx: int) -> str:
return "%s/%s" % (
cls.get_name_from_idx(idx),
cls.TOPIC
)

def set_idx(self, idx: int) -> None:
super().set_idx(idx)
self.topic = self.get_topic_from_idx(idx)

def get_topic(self) -> str:
return self.topic

Expand Down