Skip to content

Commit

Permalink
Merge pull request #8 from clearpathrobotics/wip-iterable-sensors
Browse files Browse the repository at this point in the history
Iterable Sensors
  • Loading branch information
luis-camero committed Apr 17, 2023
2 parents 13c46a1 + 5fc1667 commit 6a75659
Show file tree
Hide file tree
Showing 15 changed files with 1,297 additions and 62 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/python-package.yml
Expand Up @@ -16,7 +16,7 @@ jobs:
strategy:
fail-fast: false
matrix:
python-version: ["3.8", "3.9", "3.10"]
python-version: ["3.10"]

steps:
- uses: actions/checkout@v3
Expand Down
3 changes: 2 additions & 1 deletion clearpath_config/clearpath_config.py
@@ -1,6 +1,7 @@
from clearpath_config.system.system import SystemConfig
from clearpath_config.platform.platform import PlatformConfig
from clearpath_config.mounts.mounts import MountsConfig
from clearpath_config.sensors.sensors import SensorConfig


# ClearpathConfig:
Expand All @@ -13,4 +14,4 @@ def __init__(self, config: dict = None) -> None:
self.system = SystemConfig()
self.platform = PlatformConfig()
self.mounts = MountsConfig()
# self.sensors = SensorsConfig()
self.sensors = SensorConfig()
44 changes: 44 additions & 0 deletions clearpath_config/common.py
Expand Up @@ -135,6 +135,50 @@ def assert_valid(ip: str) -> None:
"IP '%s' entries must in range 0 to 255" % ip)


# Port
# - TCP Port
class Port:
def __init__(self, port: int) -> None:
self.assert_valid(port)
self.port = port

def __str__(self) -> str:
return str(self.port)

def __int__(self) -> int:
return self.port

def __eq__(self, other: object) -> bool:
try:
port = int(self)
other = int(other)
except Exception:
return False
return port == other

@staticmethod
def is_valid(port: int) -> None:
# Must be an integer
try:
port = int(port)
except Exception:
return False
# Must be in Range
return 0 <= port < 65536

@staticmethod
def assert_valid(port: int) -> None:
# Must be an integer
try:
port = int(port)
except ValueError as e:
raise AssertionError(e.args)
# Must be in Range
assert 0 <= port < 65536, (
"Port '%s' must be between 0 and 65535" % port
)


# File
# - file class
class File:
Expand Down
2 changes: 1 addition & 1 deletion clearpath_config/mounts/mounts.py
Expand Up @@ -102,7 +102,7 @@ def get_fath_pivot(
self,
idx: int
) -> FathPivot:
self.__fath_pivots.get(idx)
return self.__fath_pivots.get(idx)

# FathPivot: Get All
def get_fath_pivots(
Expand Down
233 changes: 232 additions & 1 deletion clearpath_config/parser.py
Expand Up @@ -19,6 +19,17 @@
from clearpath_config.platform.platform import PlatformConfig
from clearpath_config.platform.a200 import A200DecorationsConfig
from clearpath_config.platform.j100 import J100DecorationsConfig
from clearpath_config.sensors.sensors import (
Sensor,
BaseSensor,
Camera,
BaseCamera,
FlirBlackfly,
IntelRealsense,
Lidar2D,
BaseLidar2D,
SensorConfig
)
from clearpath_config.system.system import SystemConfig, HostsConfig, Host
from typing import List
import os
Expand Down Expand Up @@ -270,7 +281,6 @@ class MountParser(BaseConfigParser):
class Base(BaseConfigParser):
# Keys
MODEL = "model"
MOUNTING_LINK = "mounting_link"

def __new__(cls, config: dict) -> BaseMount:
parent = cls.get_optional_val(
Expand Down Expand Up @@ -456,6 +466,225 @@ def get_mounts(cls, config: dict, model: str) -> List[Mount]:
return models


class BaseSensorParser(BaseConfigParser):
# Keys
URDF_ENABLED = "urdf_enabled"
LAUNCH_ENABLED = "launch_enabled"

def __new__(cls, config: dict) -> BaseSensor:
parent = cls.get_optional_val(
AccessoryParser.PARENT, config, AccessoryParser.PARENT)
xyz = cls.get_optional_val(
AccessoryParser.XYZ, config, Accessory.XYZ)
rpy = cls.get_optional_val(
AccessoryParser.RPY, config, Accessory.RPY)
urdf_enabled = cls.get_optional_val(
BaseSensorParser.URDF_ENABLED, config, BaseSensor.URDF_ENABLED)
launch_enabled = cls.get_optional_val(
BaseSensorParser.LAUNCH_ENABLED, config, BaseSensor.LAUNCH_ENABLED)
return BaseSensor(
parent=parent,
xyz=xyz, rpy=rpy,
urdf_enabled=urdf_enabled,
launch_enabled=launch_enabled
)


class BaseLidar2DParser(BaseConfigParser):
# Keys
IP = "ip"
PORT = "port"
MIN_ANGLE = "min_angle"
MAX_ANGLE = "max_angle"

def __new__(cls, config: dict) -> BaseLidar2D:
sensor = BaseSensorParser(config)
ip = cls.get_optional_val(
BaseLidar2DParser.IP, config, BaseLidar2D.IP_ADDRESS)
port = cls.get_optional_val(
BaseLidar2DParser.PORT, config, BaseLidar2D.IP_PORT)
min_angle = cls.get_optional_val(
BaseLidar2DParser.MIN_ANGLE, config, BaseLidar2D.MIN_ANGLE)
max_angle = cls.get_optional_val(
BaseLidar2DParser.MAX_ANGLE, config, BaseLidar2D.MAX_ANGLE)
return BaseLidar2D(
parent=sensor.get_parent(),
xyz=sensor.get_xyz(),
rpy=sensor.get_rpy(),
urdf_enabled=sensor.get_urdf_enabled(),
launch_enabled=sensor.get_launch_enabled(),
ip=ip,
port=port,
min_angle=min_angle,
max_angle=max_angle
)


class Lidar2DParser(BaseConfigParser):
MODEL = "model"

def __new__(cls, config: dict) -> BaseLidar2D:
base = BaseLidar2DParser(config)
model = cls.get_required_val(Lidar2DParser.MODEL, config)
lidar2d = Lidar2D(model)
# Set Base Parameters
lidar2d.set_parent(base.get_parent())
lidar2d.set_xyz(base.get_xyz())
lidar2d.set_rpy(base.get_rpy())
lidar2d.set_urdf_enabled(base.get_urdf_enabled())
lidar2d.set_launch_enabled(base.get_launch_enabled())
lidar2d.set_ip(base.get_ip())
lidar2d.set_port(base.get_port())
lidar2d.set_min_angle(base.get_min_angle())
lidar2d.set_max_angle(base.get_max_angle())
return lidar2d


class BaseCameraParser(BaseConfigParser):
FPS = "fps"
SERIAL = "serial"

def __new__(cls, config: dict) -> BaseCamera:
sensor = BaseSensorParser(config)
fps = cls.get_optional_val(
BaseCameraParser.FPS, config, BaseCamera.FPS)
serial = cls.get_optional_val(
BaseCameraParser.SERIAL, config, BaseCamera.SERIAL)
return BaseCamera(
parent=sensor.get_parent(),
xyz=sensor.get_xyz(),
rpy=sensor.get_rpy(),
urdf_enabled=sensor.get_urdf_enabled(),
launch_enabled=sensor.get_launch_enabled(),
fps=fps,
serial=serial
)


class CameraParser(BaseConfigParser):
MODEL = "model"

# Blackfly Parameters
CONNECTION_TYPE = "connection_type"
ENCODING = "encoding"

# Realsense Parameters
WIDTH = "width"
HEIGHT = "height"
DEPTH_ENABLED = "depth_enabled"
DEPTH_FPS = "depth_fps"
DEPTH_WIDTH = "depth_width"
DEPTH_HEIGHT = "depth_height"

def __new__(cls, config: dict) -> BaseLidar2D:
base = BaseCameraParser(config)
model = cls.get_required_val(CameraParser.MODEL, config)
camera = Camera(model)
# Set Base Parameters
camera.set_parent(base.get_parent())
camera.set_xyz(base.get_xyz())
camera.set_rpy(base.get_rpy())
camera.set_urdf_enabled(base.get_urdf_enabled())
camera.set_launch_enabled(base.get_launch_enabled())
camera.set_fps(base.get_fps())
camera.set_serial(base.get_serial())
# Set Specific Parameters
if model == Camera.FLIR_BLACKFLY:
camera.set_connection_type(
cls.get_optional_val(
CameraParser.CONNECTION_TYPE,
config,
FlirBlackfly.CONNECTION_TYPE
)
)
camera.set_encoding(
cls.get_optional_val(
CameraParser.ENCODING,
config,
FlirBlackfly.BAYER_RG8
)
)
elif model == Camera.INTEL_REALSENSE:
camera.set_width(
cls.get_optional_val(
CameraParser.WIDTH,
config,
IntelRealsense.WIDTH
)
)
camera.set_height(
cls.get_optional_val(
CameraParser.HEIGHT,
config,
IntelRealsense.HEIGHT
)
)
camera.set_depth_enabled(
cls.get_optional_val(
CameraParser.DEPTH_ENABLED,
config,
IntelRealsense.DEPTH_ENABLED
)
)
camera.set_depth_fps(
cls.get_optional_val(
CameraParser.DEPTH_FPS,
config,
IntelRealsense.DEPTH_FPS
)
)
camera.set_depth_width(
cls.get_optional_val(
CameraParser.DEPTH_WIDTH,
config,
IntelRealsense.DEPTH_WIDTH
)
)
camera.set_depth_height(
cls.get_optional_val(
CameraParser.DEPTH_HEIGHT,
config,
IntelRealsense.DEPTH_HEIGHT
)
)
return camera


class SensorParser(BaseConfigParser):

def __new__(cls, model: str, config: dict) -> BaseSensor:
Sensor.assert_type(model)
if model == Sensor.LIDAR2D:
return Lidar2DParser(config)
elif model == Sensor.CAMERA:
return CameraParser(config)


class SensorConfigParser(BaseConfigParser):
# Key
SENSORS = "sensors"

def __new__(cls, config: dict) -> SensorConfig:
snrconfig = SensorConfig()
# Sensors
sensors = cls.get_optional_val(cls.SENSORS, config)
if not sensors:
return snrconfig
# Lidar2D
snrconfig.set_all_lidar_2d(cls.get_sensors(sensors, Sensor.LIDAR2D))
# Camera
snrconfig.set_all_camera(cls.get_sensors(sensors, Sensor.CAMERA))
return snrconfig

@classmethod
def get_sensors(cls, config: dict, model: str) -> List[BaseSensor]:
entries = cls.get_optional_val(model, config, [])
models = []
for entry in entries:
models.append(SensorParser(model, entry))
return models


# Clearpath Configuration Parser
class ClearpathConfigParser(BaseConfigParser):
@staticmethod
Expand Down Expand Up @@ -525,4 +754,6 @@ def __new__(self, config):
cprconfig.platform = PlatformConfigParser(config)
# MountConfig
cprconfig.mounts = MountsConfigParser(config)
# SensorConfig
cprconfig.sensors = SensorConfigParser(config)
return cprconfig

0 comments on commit 6a75659

Please sign in to comment.