From f3dd575846dc3a3a32c9fa9cbe07cb6ecfe79137 Mon Sep 17 00:00:00 2001 From: harderthan Date: Tue, 2 Jan 2024 22:38:07 +0900 Subject: [PATCH 1/2] bring upstream code with no modification --- .../simulator/vehicles/sensors/__init__.py | 5 + .../simulator/vehicles/sensors/sensor.py | 143 ++++++++++++++++++ 2 files changed, 148 insertions(+) create mode 100644 exts/stride.simulator/stride/simulator/vehicles/sensors/__init__.py create mode 100644 exts/stride.simulator/stride/simulator/vehicles/sensors/sensor.py diff --git a/exts/stride.simulator/stride/simulator/vehicles/sensors/__init__.py b/exts/stride.simulator/stride/simulator/vehicles/sensors/__init__.py new file mode 100644 index 0000000..c4ac2ee --- /dev/null +++ b/exts/stride.simulator/stride/simulator/vehicles/sensors/__init__.py @@ -0,0 +1,5 @@ +from .sensor import Sensor +from .barometer import Barometer +from .gps import GPS +from .imu import IMU +from .magnetometer import Magnetometer diff --git a/exts/stride.simulator/stride/simulator/vehicles/sensors/sensor.py b/exts/stride.simulator/stride/simulator/vehicles/sensors/sensor.py new file mode 100644 index 0000000..1bfa12c --- /dev/null +++ b/exts/stride.simulator/stride/simulator/vehicles/sensors/sensor.py @@ -0,0 +1,143 @@ +__all__ = ["Sensor"] + +from pegasus.simulator.logic.state import State + +class Sensor: + """The base class for implementing a sensor + + Attributes: + update_period (float): The period for each sensor update: update_period = 1 / update_rate (in s). + origin_lat (float): The latitude of the origin of the world in degrees (might get used by some sensors). + origin_lon (float): The longitude of the origin of the world in degrees (might get used by some sensors). + origin_alt (float): The altitude of the origin of the world relative to sea water level (might get used by some sensors) + """ + def __init__(self, sensor_type: str, update_rate: float): + """Initialize the Sensor class + + Args: + sensor_type (str): A name that describes the type of sensor + update_rate (float): The rate at which the data in the sensor should be refreshed (in Hz) + """ + + # Set the sensor type and update rate + self._sensor_type = sensor_type + self._update_rate = update_rate + self._update_period = 1.0 / self._update_rate + + # Auxiliar variables used to control whether to update the sensor or not given the time elapsed + self._first_update = True + self._total_time = 0.0 + + # Set the "configuration of the world" - some sensors might need it + self._origin_lat = -999 + self._origin_lon = -999 + self._origin_alt = 0.0 + + def initialize(self, origin_lat, origin_lon, origin_alt): + """Method that initializes the sensor latitude, longitude and altitude attributes. + + Note: + Given that some sensors require the knowledge of the latitude, longitude and altitude of the [0, 0, 0] coordinate + of the world, then we might as well just save this information for whatever sensor that comes + + Args: + origin_lat (float): The latitude of the origin of the world in degrees (might get used by some sensors). + origin_lon (float): The longitude of the origin of the world in degrees (might get used by some sensors). + origin_alt (float): The altitude of the origin of the world relative to sea water level (might get used by some sensors). + """ + self._origin_lat = origin_lat + self._origin_lon = origin_lon + self._origin_alt = origin_alt + + def set_update_rate(self, update_rate: float): + """Method that changes the update rate and period of the sensor + + Args: + update_rate (float): The new rate at which the data in the sensor should be refreshed (in Hz) + """ + self._update_rate = update_rate + self._update_period = 1.0 / self._update_rate + + def update_at_rate(fnc): + """Decorator function used to check if the time elapsed between the last sensor update call and the current + sensor update call is higher than the defined update_rate of the sensor. If so, we need to actually compute new + values to simulate a measurement of the sensor at a given rate. + + Args: + fnc (function): The function that we want to enforce a specific update rate. + + Examples: + >>> class GPS(Sensor): + >>> @Sensor.update_at_rate + >>> def update(self): + >>> (do some logic here) + + Returns: + [None, Dict]: This decorator function returns None if there was no data to be produced by the sensor at the + specified timestamp or a dict with the current state of the sensor otherwise. + """ + + # + + # Define a wrapper function so that the "self" of the object can be passed to the function as well + def wrapper(self, state: State, dt: float): + + # Add the total time passed between the last time the sensor was updated and the current call + self._total_time += dt + + # If it is time to update the sensor data, then just call the update function of the sensor + if self._total_time >= self._update_period or self._first_update: + + # Result of the update function for the sensor + result = fnc(self, state, self._total_time) + + # Reset the auxiliar counter variables + self._first_update = False + self._total_time = 0.0 + + return result + return None + return wrapper + + @property + def sensor_type(self): + """ + (str) A name that describes the type of sensor. + """ + return self._sensor_type + + @property + def update_rate(self): + """ + (float) The rate at which the data in the sensor should be refreshed (in Hz). + """ + return self._update_rate + + @property + def state(self): + """ + (dict) A dictionary which contains the data produced by the sensor at any given time. + """ + return None + + def update(self, state: State, dt: float): + """Method that should be implemented by the class that inherits Sensor. This is where the actual implementation + of the sensor should be performed. + + Args: + state (State): The current state of the vehicle. + dt (float): The time elapsed between the previous and current function calls (s). + + Returns: + (dict) A dictionary containing the current state of the sensor (the data produced by the sensor) + """ + pass + + def config_from_dict(self, config_dict): + """Method that should be implemented by the class that inherits Sensor. This is where the configuration of the + sensor based on a dictionary input should be performed. + + Args: + config_dict (dict): A dictionary containing the configurations of the sensor + """ + pass From 37f25ebcb5575500e7a17781ff58cddbea5c2d71 Mon Sep 17 00:00:00 2001 From: harderthan Date: Tue, 2 Jan 2024 23:16:36 +0900 Subject: [PATCH 2/2] clean up code --- .../simulator/vehicles/sensors/__init__.py | 4 - .../simulator/vehicles/sensors/sensor.py | 149 +++++++++++------- 2 files changed, 91 insertions(+), 62 deletions(-) diff --git a/exts/stride.simulator/stride/simulator/vehicles/sensors/__init__.py b/exts/stride.simulator/stride/simulator/vehicles/sensors/__init__.py index c4ac2ee..2bc7a35 100644 --- a/exts/stride.simulator/stride/simulator/vehicles/sensors/__init__.py +++ b/exts/stride.simulator/stride/simulator/vehicles/sensors/__init__.py @@ -1,5 +1 @@ from .sensor import Sensor -from .barometer import Barometer -from .gps import GPS -from .imu import IMU -from .magnetometer import Magnetometer diff --git a/exts/stride.simulator/stride/simulator/vehicles/sensors/sensor.py b/exts/stride.simulator/stride/simulator/vehicles/sensors/sensor.py index 1bfa12c..6d25ecc 100644 --- a/exts/stride.simulator/stride/simulator/vehicles/sensors/sensor.py +++ b/exts/stride.simulator/stride/simulator/vehicles/sensors/sensor.py @@ -3,97 +3,95 @@ from pegasus.simulator.logic.state import State class Sensor: - """The base class for implementing a sensor + """The base class for implementing a sensor. Attributes: - update_period (float): The period for each sensor update: update_period = 1 / update_rate (in s). - origin_lat (float): The latitude of the origin of the world in degrees (might get used by some sensors). - origin_lon (float): The longitude of the origin of the world in degrees (might get used by some sensors). - origin_alt (float): The altitude of the origin of the world relative to sea water level (might get used by some sensors) + update_period (float): The period for each sensor update: update_period = 1 / update_frequency (in seconds). + origin_latitude (float): The latitude of the origin of the world in degrees. + origin_longitude (float): The longitude of the origin of the world in degrees. + origin_altitude (float): The altitude of the origin of the world relative to sea water level in meters. """ - def __init__(self, sensor_type: str, update_rate: float): + def __init__(self, sensor_type: str, update_frequency: float): """Initialize the Sensor class Args: - sensor_type (str): A name that describes the type of sensor - update_rate (float): The rate at which the data in the sensor should be refreshed (in Hz) + sensor_type (str): A name that describes the type of sensor. + update_frequency (float): The rate at which the data in the sensor should be refreshed (in Hz). """ - # Set the sensor type and update rate + # Set the sensor type and update rate. self._sensor_type = sensor_type - self._update_rate = update_rate - self._update_period = 1.0 / self._update_rate + self._update_frequency = update_frequency + self._update_period = 1.0 / self._update_frequency - # Auxiliar variables used to control whether to update the sensor or not given the time elapsed + # Auxiliary variables used to control whether to update the sensor or not, given the time elapsed. self._first_update = True self._total_time = 0.0 - # Set the "configuration of the world" - some sensors might need it - self._origin_lat = -999 - self._origin_lon = -999 - self._origin_alt = 0.0 + # Set the spherical coordinate of the world - some sensors might need it. + self._origin_latitude = -999 + self._origin_longitude = -999 + self._origin_altitude = 0.0 + + def set_spherical_coordinate(self, origin_latitude, origin_longitude, origin_altitude): + """Method that initializes the sensor shperical coordinate attributes. - def initialize(self, origin_lat, origin_lon, origin_alt): - """Method that initializes the sensor latitude, longitude and altitude attributes. - Note: - Given that some sensors require the knowledge of the latitude, longitude and altitude of the [0, 0, 0] coordinate - of the world, then we might as well just save this information for whatever sensor that comes - + Given that some sensors require the knowledge of the latitude, longitude and altitude of the [0, 0, 0] + coordinate of the world, then we might as well just save this information for whatever sensor that comes. + Args: - origin_lat (float): The latitude of the origin of the world in degrees (might get used by some sensors). - origin_lon (float): The longitude of the origin of the world in degrees (might get used by some sensors). - origin_alt (float): The altitude of the origin of the world relative to sea water level (might get used by some sensors). + origin_latitude (float): The latitude of the origin of the world in degrees. + origin_longitude (float): The longitude of the origin of the world in degrees. + origin_altitude (float): The altitude of the origin of the world relative to sea water level in meters. """ - self._origin_lat = origin_lat - self._origin_lon = origin_lon - self._origin_alt = origin_alt + self._origin_latitude = origin_latitude + self._origin_longitude = origin_longitude + self._origin_altitude = origin_altitude - def set_update_rate(self, update_rate: float): - """Method that changes the update rate and period of the sensor + def set_update_frequency(self, frequency: float): + """Method that changes the update frequency and period of the sensor. Args: - update_rate (float): The new rate at which the data in the sensor should be refreshed (in Hz) + frequency (float): The new frequency at which the data in the sensor should be refreshed (in Hz). """ - self._update_rate = update_rate - self._update_period = 1.0 / self._update_rate + self._update_frequency = frequency + self._update_period = 1.0 / self._update_frequency - def update_at_rate(fnc): - """Decorator function used to check if the time elapsed between the last sensor update call and the current - sensor update call is higher than the defined update_rate of the sensor. If so, we need to actually compute new - values to simulate a measurement of the sensor at a given rate. + def update_at_frequency(fnc): + """Decorator function used to check if the time elapsed between the last sensor update call and the current + sensor update call is higher than the defined 'update_frequency' of the sensor. If so, we need to actually + compute new values to simulate a measurement of the sensor at a given frequency. Args: fnc (function): The function that we want to enforce a specific update rate. Examples: >>> class GPS(Sensor): - >>> @Sensor.update_at_rate + >>> @Sensor.update_at_frequency >>> def update(self): >>> (do some logic here) Returns: [None, Dict]: This decorator function returns None if there was no data to be produced by the sensor at the - specified timestamp or a dict with the current state of the sensor otherwise. + specified timestamp or a Dict with the current state of the sensor otherwise. """ - - # - # Define a wrapper function so that the "self" of the object can be passed to the function as well - def wrapper(self, state: State, dt: float): + # Define a wrapper function so that the "self" of the object can be passed to the function as well. - # Add the total time passed between the last time the sensor was updated and the current call - self._total_time += dt + def wrapper(self, state: State, dt: float): + # Add the total time passed between the last time the sensor was updated and the current call. + self.total_time += dt - # If it is time to update the sensor data, then just call the update function of the sensor - if self._total_time >= self._update_period or self._first_update: + # If it is time to update the sensor data, then just call the update function of the sensor. + if self.total_time >= self.update_period or self.first_update: - # Result of the update function for the sensor - result = fnc(self, state, self._total_time) + # Result of the update function for the sensor. + result = fnc(self, state, self.total_time) # pylint: disable=not-callable, TODO: enable this. - # Reset the auxiliar counter variables - self._first_update = False - self._total_time = 0.0 + # Reset the auxiliary counter variables. + self.first_update = False + self.total_time = 0.0 return result return None @@ -107,18 +105,53 @@ def sensor_type(self): return self._sensor_type @property - def update_rate(self): + def update_frequency(self): """ (float) The rate at which the data in the sensor should be refreshed (in Hz). """ - return self._update_rate + return self._update_frequency + + @property + def update_period(self): + """ + (float) The period for each sensor update: update_period = 1 / update_frequency (in seconds). + """ + return self._update_period + + @property + def first_update(self): + """ + (bool) A flag that indicates whether this is the first time the sensor is being updated. + """ + return self._first_update + + @property + def total_time(self): + """ + (float) The total time elapsed since the last sensor update. + """ + return self._total_time + + @property + def origin_latitude(self): + """ + (float) The latitude of the origin of the world in degrees. + """ + return self._origin_latitude + + @property + def origin_longitude(self): + """ + (float) The longitude of the origin of the world in degrees. + """ + return self._origin_longitude @property - def state(self): + def origin_altitude(self): """ - (dict) A dictionary which contains the data produced by the sensor at any given time. + (float) The altitude of the origin of the world relative to sea water level in meters. """ - return None + return self._origin_altitude def update(self, state: State, dt: float): """Method that should be implemented by the class that inherits Sensor. This is where the actual implementation @@ -134,7 +167,7 @@ def update(self, state: State, dt: float): pass def config_from_dict(self, config_dict): - """Method that should be implemented by the class that inherits Sensor. This is where the configuration of the + """Method that should be implemented by the class that inherits Sensor. This is where the configuration of the sensor based on a dictionary input should be performed. Args: