diff --git a/README.md b/README.md
index 557e375f..c468763a 100644
--- a/README.md
+++ b/README.md
@@ -36,28 +36,32 @@ architecture.
## Overview of Repo Structure
-- `scos_actions/actions`: This includes the base Action class, signals, and the following
+- `scos_actions/actions`: This includes base Action classes and the following
common action classes:
- `acquire_single_freq_fft`: performs FFTs and calculates mean, median, min, max, and
sample statistics at a single center frequency.
- `acquire_single_freq_tdomain_iq`: acquires IQ data at a single center frequency.
- `acquire_stepped_freq_tdomain_iq`: acquires IQ data at multiple center frequencies.
- `calibrate_y_factor`: performs calibration using the Y-Factor method.
- - `sync_gps`: gets GPS location and syncs the host to GPS time
- `monitor_sigan`: ensures a signal analyzer is available and is able to maintain a
connection to the computer.
+ - `sync_gps`: gets GPS location and syncs the host to GPS time
+- `scos_actions/calibration`: This includes an interface for sensor calibration data
- `scos_actions/configs/actions`: This folder contains the YAML files with the parameters
used to initialize the actions described above.
- `scos_actions/discover`: This includes the code to read YAML files and make actions
- available to scos-sensor.
-- `scos_actions/hardware`: This includes the signal analyzer interface and GPS interface
- used by the actions and the mock signal analyzer. The signal analyzer interface is
- intended to represent universal functionality that is common across all signal
- analyzers. The specific implementations of the signal analyzer interface for
- particular signal analyzers are provided in separate repositories like
+ available to SCOS Sensor.
+- `scos_actions/hardware`: This includes the signal analyzer and GPS interfaces used by
+ actions and the mock signal analyzer. The signal analyzer interface represents functionality
+ common to all signal analyzers. Specific implementations of the signal analyzer interface
+ for particular signal analyzers are provided in separate repositories like
[scos-usrp](https://github.com/NTIA/scos-usrp).
+- `scos_actions/metadata`: This includes the `SigMFBuilder` class and related metadata
+ structures used to generate [SigMF](https://github.com/SigMF/SigMF)-compliant metadata.
- `scos_actions/signal_processing`: This contains various common signal processing
routines which are used in actions.
+- `scos_actions/status`: This provides a class to register objects with the SCOS Sensor
+ status endpoint.
## Running in SCOS Sensor
@@ -176,14 +180,11 @@ actions = {
"sync_gps": SyncGps(gps),
}
-yaml_actions, yaml_test_actions = init(sigan=sigan, yaml_dir=ACTION_DEFINITIONS_DIR)
+yaml_actions, yaml_test_actions = init(yaml_dir=ACTION_DEFINITIONS_DIR)
actions.update(yaml_actions)
```
-Pass the implementation of the signal analyzer interface and the directory where the
-YAML files are located to the `init` method.
-
If no existing action class meets your needs, see [Writing Custom Actions](
#writing-custom-actions).
@@ -300,11 +301,13 @@ You're done.
sensor owner wants the sensor to be able to *do*. At a lower level, they are simply
Python classes with a special method `__call__`. Actions use [Django Signals](
) to provide data and results to
-scos-sensor.
+SCOS Sensor.
Start by looking at the [`Action` base class](scos_actions/actions/interfaces/action.py).
It includes some logic to parse a description and summary out of the action class's
-docstring, and a `__call__` method that must be overridden.
+docstring, and a `__call__` method that must be overridden. Actions are only instantiated
+with parameters. The signal analyzer implementation will be passed to the action at
+execution time through the __call__ method's Sensor object.
A new custom action can inherit from the existing action classes to reuse and build
upon existing functionality. A [`MeasurementAction` base class](scos_actions/actions/interfaces/measurement_action.py),
@@ -318,25 +321,21 @@ enables SCOS Sensor to do something with the results of the action. This could r
from storing measurement data to recycling a Docker container or to fixing an unhealthy
connection to the signal analyzer. You can see the available signals in
[`scos_actions/signals.py`](scos_actions/signals.py).
-The following signals are currently offered:
+The following signals are currently offered for actions:
- `measurement_action_completed` - signal expects task_id, data, and metadata
- `location_action_completed` - signal expects latitude and longitude
- `trigger_api_restart` - triggers a restart of the API docker container (where
-scos-sensor runs)
+SCOS Sensor runs)
New signals can be added. However, corresponding signal handlers must be added to
-scos-sensor to receive the signals and process the results.
+SCOS Sensor to receive the signals and process the results.
##### Adding custom action to SCOS Actions
A custom action meant to be re-used by other plugins can live in SCOS Actions. It can
be instantiated using a YAML file, or directly in the `actions` dictionary in the
-`discover/__init__.py` module. This can be done in SCOS Actions with a mock signal
-analyzer. Plugins supporting other hardware would need to import the action from
-SCOS Actions. Then it can be instantiated in that plugin’s actions dictionary in its
-discover module, or in a YAML file living in that plugin (as long as its discover
-module includes the required code to parse the YAML files).
+`discover/__init__.py` module.
##### Adding system or hardware specific custom action
@@ -349,7 +348,7 @@ above.
### Supporting a Different Signal Analyzer
[scos_usrp](https://github.com/NTIA/scos-usrp) adds support for the Ettus B2xx line of
-signal analyzers to `scos-sensor`. Follow these instructions to add support for
+signal analyzers to SCOS Sensor. Follow these instructions to add support for
another signal analyzer with a Python API.
- Create a new repository called `scos-[signal analyzer name]`.
@@ -372,16 +371,14 @@ another signal analyzer with a Python API.
custom actions that are unique to the hardware. See [Adding Actions](#adding-actions)
subsection above.
- In the new repository, add a `discover/__init__.py` file. This should contain a
- dictionary called `actions` with a key of action name and a value of action object.
+ dictionary called `actions` with keys of action names and values of action instances.
+ If the repository also includes new action implementations, it should also expose a
+ dictionary named `action_classes` with keys of actions names and values of action classes.
You can use the [init()](scos_actions/discover/__init__.py) and/or the
[load_from_yaml()](scos_actions/discover/yaml.py) methods provided in this repository
- to look for YAML files and initialize actions. These methods allow you to pass your
- new signal analyzer object to the action's constructor. You can use the existing
+ to look for YAML files and initialize actions. You can use the existing
action classes [defined in this repository](scos_actions/actions/__init__.py) or
- [create custom actions](#writing-custom-actions). If the signal analyzer supports
- calibration, you should also add a `get_last_calibration_time()` method to
- `discover/__init__.py` to enable the status endpoint to report the last calibration
- time.
+ [create custom actions](#writing-custom-actions).
If your signal analyzer doesn't have a Python API, you'll need a Python wrapper that
calls out to your signal analyzer's available API and reads the samples back into
@@ -389,7 +386,8 @@ Python. Libraries such as [SWIG](http://www.swig.org/) can automatically generat
Python wrappers for programs written in C/C++.
The next step in supporting a different signal analyzer is to create a class that
-inherits from the [GPSInterface](scos_actions/hardware/gps_iface.py) abstract class.
+inherits from the [GPSInterface](scos_actions/hardware/gps_iface.py) abstract class if
+the signal analyzer includes GPS capabilities.
Then add the `sync_gps` and `monitor_sigan` actions to your `actions` dictionary,
passing the gps object to the `SyncGps` constructor, and the signal analyzer object to
the `MonitorSignalAnalyzer` constructor. See the example in the [Adding Actions
@@ -407,7 +405,7 @@ specific drivers are required for your signal analyzer, you can attempt to link
within the package or create a docker image with the necessary files. You can host the
docker image as a [GitHub package](
-). Then, when running scos-sensor, set the environment variable
+). Then, when running SCOS Sensor, set the environment variable
`BASE_IMAGE=`.
## License
diff --git a/sample_debug.py b/sample_debug.py
index 8ff6d5b9..cc04ae29 100644
--- a/sample_debug.py
+++ b/sample_debug.py
@@ -6,6 +6,7 @@
from scos_actions.actions.acquire_single_freq_fft import SingleFrequencyFftAcquisition
from scos_actions.hardware.mocks.mock_sigan import MockSignalAnalyzer
+from scos_actions.hardware.sensor import Sensor
from scos_actions.signals import measurement_action_completed
parameters = {
@@ -17,6 +18,7 @@
"nffts": 300,
"nskip": 0,
"classification": "UNCLASSIFIED",
+ "calibration_adjust": False,
}
schedule_entry_json = {
"name": "test_m4s_multi_1",
@@ -50,10 +52,11 @@ def callback(sender, **kwargs):
measurement_action_completed.connect(callback)
-action = SingleFrequencyFftAcquisition(
- parameters=parameters, sigan=MockSignalAnalyzer(randomize_values=True)
+action = SingleFrequencyFftAcquisition(parameters)
+sensor = Sensor(
+ signal_analyzer=MockSignalAnalyzer(randomize_values=True), capabilities={}
)
-action(schedule_entry_json, 1)
+action(sensor, schedule_entry_json, 1)
print("metadata:")
print(json.dumps(_metadata, indent=4))
print("finished")
diff --git a/scos_actions/__init__.py b/scos_actions/__init__.py
index 54ccb5d2..73d4c8be 100644
--- a/scos_actions/__init__.py
+++ b/scos_actions/__init__.py
@@ -1 +1 @@
-__version__ = "7.1.0"
+__version__ = "8.0.0"
diff --git a/scos_actions/actions/acquire_sea_data_product.py b/scos_actions/actions/acquire_sea_data_product.py
index 17adb08c..3409699a 100644
--- a/scos_actions/actions/acquire_sea_data_product.py
+++ b/scos_actions/actions/acquire_sea_data_product.py
@@ -20,7 +20,6 @@
Currently in development.
"""
-import json
import logging
import lzma
import platform
@@ -39,9 +38,7 @@
from scos_actions import __version__ as SCOS_ACTIONS_VERSION
from scos_actions import utils
from scos_actions.actions.interfaces.action import Action
-from scos_actions.capabilities import SENSOR_DEFINITION_HASH, SENSOR_LOCATION
-from scos_actions.hardware import preselector, switches
-from scos_actions.hardware.mocks.mock_gps import MockGPS
+from scos_actions.hardware.sensor import Sensor
from scos_actions.hardware.utils import (
get_cpu_uptime_seconds,
get_current_cpu_clock_speed,
@@ -77,7 +74,6 @@
create_statistical_detector,
)
from scos_actions.signals import measurement_action_completed, trigger_api_restart
-from scos_actions.status import start_time
from scos_actions.utils import convert_datetime_to_millisecond_iso_format, get_days_up
env = Env()
@@ -121,7 +117,9 @@
# Create power detectors
TD_DETECTOR = create_statistical_detector("TdMeanMaxDetector", ["max", "mean"])
-FFT_M3_DETECTOR = create_statistical_detector("FftM3Detector", ["max", "mean", "median"])
+FFT_M3_DETECTOR = create_statistical_detector(
+ "FftM3Detector", ["max", "mean", "median"]
+)
PFP_M3_DETECTOR = create_statistical_detector("PfpM3Detector", ["min", "max", "mean"])
@@ -158,7 +156,7 @@ def __init__(
# Compute the amplitude shift for PSD scaling. The FFT result
# is in pseudo-power log units and must be scaled to a PSD.
self.fft_scale_factor = (
- - 10.0 * np.log10(impedance_ohms) # Pseudo-power to power
+ -10.0 * np.log10(impedance_ohms) # Pseudo-power to power
+ 27.0 # Watts to dBm (+30) and baseband to RF (-3)
- 10.0 * np.log10(sample_rate_Hz * fft_size) # PSD scaling
+ 20.0 * np.log10(window_ecf) # Window energy correction
@@ -178,7 +176,9 @@ def run(self, iq: ray.ObjectRef) -> np.ndarray:
)
# Power in Watts
fft_amplitudes = calculate_pseudo_power(fft_amplitudes)
- fft_result = apply_statistical_detector(fft_amplitudes, self.detector) # (max, mean, median)
+ fft_result = apply_statistical_detector(
+ fft_amplitudes, self.detector
+ ) # (max, mean, median)
percentile_result = np.percentile(fft_amplitudes, self.percentiles, axis=0)
fft_result = np.vstack((fft_result, percentile_result))
fft_result = np.fft.fftshift(fft_result, axes=(1,)) # Shift frequencies
@@ -449,10 +449,8 @@ class NasctnSeaDataProduct(Action):
:param sigan: Instance of SignalAnalyzerInterface.
"""
- def __init__(self, parameters, sigan, gps=None):
- if gps is None:
- gps = MockGPS()
- super().__init__(parameters, sigan, gps)
+ def __init__(self, parameters: dict):
+ super().__init__(parameters)
# Assume preselector is present
rf_path_name = utils.get_parameter(RF_PATH, self.parameters)
self.rf_path = {self.PRESELECTOR_PATH_KEY: rf_path_name}
@@ -505,8 +503,9 @@ def __init__(self, parameters, sigan, gps=None):
# Get iterable parameter list
self.iteration_params = utils.get_iterable_parameters(self.parameters)
- def __call__(self, schedule_entry, task_id):
+ def __call__(self, sensor: Sensor, schedule_entry: dict, task_id: int):
"""This is the entrypoint function called by the scheduler."""
+ self._sensor = sensor
action_start_tic = perf_counter()
_ = psutil.cpu_percent(interval=None) # Initialize CPU usage monitor
@@ -521,7 +520,7 @@ def __call__(self, schedule_entry, task_id):
schedule_entry,
self.iteration_params,
)
- self.create_global_sensor_metadata()
+ self.create_global_sensor_metadata(self.sensor)
self.create_global_data_product_metadata()
# Initialize remote supervisor actors for IQ processing
@@ -607,7 +606,7 @@ def __call__(self, schedule_entry, task_id):
self.sigmf_builder.set_median_channel_powers(median_ch_pwrs)
# Get diagnostics last to record action runtime
self.capture_diagnostics(
- action_start_tic, cpu_speed
+ self.sensor, action_start_tic, cpu_speed
) # Add diagnostics to metadata
measurement_action_completed.send(
@@ -627,17 +626,23 @@ def capture_iq(self, params: dict) -> dict:
nskip = utils.get_parameter(NUM_SKIP, params)
num_samples = int(params[SAMPLE_RATE] * duration_ms * 1e-3)
# Collect IQ data
- measurement_result = self.sigan.acquire_time_domain_samples(num_samples, nskip)
+ measurement_result = self.sensor.signal_analyzer.acquire_time_domain_samples(
+ num_samples, nskip
+ )
# Store some metadata with the IQ
measurement_result.update(params)
- measurement_result["sensor_cal"] = self.sigan.sensor_calibration_data
+ measurement_result[
+ "sensor_cal"
+ ] = self.sensor.signal_analyzer.sensor_calibration_data
toc = perf_counter()
logger.debug(
f"IQ Capture ({duration_ms} ms @ {(params[FREQUENCY]/1e6):.1f} MHz) completed in {toc-tic:.2f} s."
)
return measurement_result
- def capture_diagnostics(self, action_start_tic: float, cpu_speeds: list) -> None:
+ def capture_diagnostics(
+ self, sensor: Sensor, action_start_tic: float, cpu_speeds: list
+ ) -> None:
"""
Capture diagnostic sensor data.
@@ -701,7 +706,7 @@ def capture_diagnostics(self, action_start_tic: float, cpu_speeds: list) -> None
switch_diag = {}
all_switch_status = {}
# Add status for any switch
- for switch in switches.values():
+ for switch in self.sensor.switches.values():
switch_status = switch.get_status()
del switch_status["name"]
del switch_status["healthy"]
@@ -715,7 +720,7 @@ def capture_diagnostics(self, action_start_tic: float, cpu_speeds: list) -> None
switch_diag["door_closed"] = not bool(all_switch_status["door_state"])
# Read preselector sensors
- ps_diag = preselector.get_status()
+ ps_diag = sensor.preselector.get_status()
del ps_diag["name"]
del ps_diag["healthy"]
@@ -756,12 +761,12 @@ def capture_diagnostics(self, action_start_tic: float, cpu_speeds: list) -> None
logger.warning("Failed to get CPU overheating status")
try: # SCOS start time
cpu_diag["software_start"] = convert_datetime_to_millisecond_iso_format(
- start_time
+ self.sensor.start_time
)
except:
logger.warning("Failed to get SCOS start time")
try: # SCOS uptime
- cpu_diag["software_uptime"] = get_days_up()
+ cpu_diag["software_uptime"] = get_days_up(self.sensor.start_time)
except:
logger.warning("Failed to get SCOS uptime")
try: # SSD SMART data
@@ -777,11 +782,11 @@ def capture_diagnostics(self, action_start_tic: float, cpu_speeds: list) -> None
"scos_sensor_version": SCOS_SENSOR_GIT_TAG,
"scos_actions_version": SCOS_ACTIONS_VERSION,
"scos_sigan_plugin": ntia_diagnostics.ScosPlugin(
- name="scos_tekrsa", version=self.sigan.plugin_version
+ name="scos_tekrsa", version=self.sensor.signal_analyzer.plugin_version
),
"preselector_api_version": PRESELECTOR_API_VERSION,
- "sigan_firmware_version": self.sigan.firmware_version,
- "sigan_api_version": self.sigan.api_version,
+ "sigan_firmware_version": self.sensor.signal_analyzer.firmware_version,
+ "sigan_api_version": self.sensor.signal_analyzer.api_version,
}
toc = perf_counter()
@@ -830,7 +835,10 @@ def add_temperature_and_humidity_sensors(
logger.warning("No internal_temp found in switch status.")
try:
switch_diag["temperature_sensors"].append(
- {"name": "sigan_internal_temp", "value": self.sigan.temperature}
+ {
+ "name": "sigan_internal_temp",
+ "value": self.sensor.signal_analyzer.temperature,
+ }
)
except:
logger.warning("Unable to read internal sigan temperature")
@@ -942,7 +950,7 @@ def add_power_states(self, all_switch_status: dict, switch_diag: dict):
else:
logger.warning("No preselector_powered found in switch status.")
- def create_global_sensor_metadata(self):
+ def create_global_sensor_metadata(self, sensor: Sensor):
# Add (minimal) ntia-sensor metadata to the sigmf_builder:
# sensor ID, serial numbers for preselector, sigan, and computer
# overall sensor_spec version, e.g. "Prototype Rev. 3"
@@ -950,19 +958,19 @@ def create_global_sensor_metadata(self):
self.sigmf_builder.set_sensor(
ntia_sensor.Sensor(
sensor_spec=ntia_core.HardwareSpec(
- id=self.sensor_definition["sensor_spec"]["id"],
+ id=sensor.capabilities["sensor"]["sensor_spec"]["id"],
),
- sensor_sha512=SENSOR_DEFINITION_HASH,
+ sensor_sha512=sensor.capabilities["sensor"]["sensor_sha512"],
)
)
def test_required_components(self):
"""Fail acquisition if a required component is not available."""
- if not self.sigan.is_available:
+ if not self.sensor.signal_analyzer.is_available:
msg = "Acquisition failed: signal analyzer is not available"
trigger_api_restart.send(sender=self.__class__)
raise RuntimeError(msg)
- if not self.sigan.healthy():
+ if not self.sensor.signal_analyzer.healthy():
trigger_api_restart.send(sender=self.__class__)
return None
@@ -1001,7 +1009,8 @@ def create_global_data_product_metadata(self) -> None:
name="Power Spectral Density",
series=[d.value for d in FFT_M3_DETECTOR]
+ [
- f"{int(p)}th_percentile" if p.is_integer() else f"{p}th_percentile" for p in FFT_PERCENTILES
+ f"{int(p)}th_percentile" if p.is_integer() else f"{p}th_percentile"
+ for p in FFT_PERCENTILES
], # ["max", "mean", "median", "25th_percentile", "75th_percentile", ... "99.99th_percentile"]
length=int(FFT_SIZE * (5 / 7)),
x_units="Hz",
@@ -1108,7 +1117,7 @@ def create_capture_segment(
capture_segment = CaptureSegment(
sample_start=channel_idx * self.total_channel_data_length,
- frequency=self.sigan.frequency,
+ frequency=self.sensor.signal_analyzer.frequency,
datetime=measurement_result["capture_time"],
duration=measurement_result[DURATION_MS],
overload=measurement_result["overload"],
@@ -1120,9 +1129,9 @@ def create_capture_segment(
reference=DATA_REFERENCE_POINT,
),
sigan_settings=ntia_sensor.SiganSettings(
- reference_level=self.sigan.reference_level,
- attenuation=self.sigan.attenuation,
- preamp_enable=self.sigan.preamp_enable,
+ reference_level=self.sensor.signal_analyzer.reference_level,
+ attenuation=self.sensor.signal_analyzer.attenuation,
+ preamp_enable=self.sensor.signal_analyzer.preamp_enable,
),
)
self.sigmf_builder.add_capture(capture_segment)
@@ -1155,13 +1164,10 @@ def get_sigmf_builder(
# Do not include lengthy description
)
sigmf_builder.set_action(action_obj)
-
- if SENSOR_LOCATION is not None:
- sigmf_builder.set_geolocation(SENSOR_LOCATION)
+ if self.sensor.location is not None:
+ sigmf_builder.set_geolocation(self.sensor.location)
else:
- logger.error("Set the sensor location in the SCOS admin web UI")
- raise RuntimeError
-
+ raise Exception("Sensor does not have a location defined.")
sigmf_builder.set_data_type(self.is_complex(), bit_width=16, endianness="")
sigmf_builder.set_sample_rate(sample_rate_Hz)
sigmf_builder.set_num_channels(len(iter_params))
diff --git a/scos_actions/actions/acquire_single_freq_fft.py b/scos_actions/actions/acquire_single_freq_fft.py
index aaa37913..aa6f16ba 100644
--- a/scos_actions/actions/acquire_single_freq_fft.py
+++ b/scos_actions/actions/acquire_single_freq_fft.py
@@ -89,7 +89,6 @@
import logging
from numpy import float32, ndarray
-
from scos_actions.actions.interfaces.measurement_action import MeasurementAction
from scos_actions.hardware.mocks.mock_gps import MockGPS
from scos_actions.metadata.structs import ntia_algorithm
@@ -143,10 +142,8 @@ class SingleFrequencyFftAcquisition(MeasurementAction):
:param sigan: Instance of SignalAnalyzerInterface.
"""
- def __init__(self, parameters, sigan, gps=None):
- if gps is None:
- gps = MockGPS()
- super().__init__(parameters, sigan, gps)
+ def __init__(self, parameters: dict):
+ super().__init__(parameters)
# Pull parameters from action config
self.fft_size = get_parameter(FFT_SIZE, self.parameters)
self.nffts = get_parameter(NUM_FFTS, self.parameters)
@@ -180,9 +177,9 @@ def execute(self, schedule_entry: dict, task_id: int) -> dict:
# Save measurement results
measurement_result["data"] = m4s_result
measurement_result.update(self.parameters)
- measurement_result["calibration_datetime"] = self.sigan.sensor_calibration_data[
- "datetime"
- ]
+ measurement_result[
+ "calibration_datetime"
+ ] = self.sensor.signal_analyzer.sensor_calibration_data["datetime"]
measurement_result["task_id"] = task_id
measurement_result["classification"] = self.classification
diff --git a/scos_actions/actions/acquire_single_freq_tdomain_iq.py b/scos_actions/actions/acquire_single_freq_tdomain_iq.py
index 7943dfb7..0eb55655 100644
--- a/scos_actions/actions/acquire_single_freq_tdomain_iq.py
+++ b/scos_actions/actions/acquire_single_freq_tdomain_iq.py
@@ -34,12 +34,12 @@
import logging
from numpy import complex64
-
-from scos_actions import utils
from scos_actions.actions.interfaces.measurement_action import MeasurementAction
from scos_actions.hardware.mocks.mock_gps import MockGPS
from scos_actions.utils import get_parameter
+from scos_actions import utils
+
logger = logging.getLogger(__name__)
# Define parameter keys
@@ -71,10 +71,8 @@ class SingleFrequencyTimeDomainIqAcquisition(MeasurementAction):
:param sigan: instance of SignalAnalyzerInterface.
"""
- def __init__(self, parameters, sigan, gps=None):
- if gps is None:
- gps = MockGPS()
- super().__init__(parameters=parameters, sigan=sigan, gps=gps)
+ def __init__(self, parameters: dict):
+ super().__init__(parameters=parameters)
# Pull parameters from action config
self.nskip = get_parameter(NUM_SKIP, self.parameters)
self.duration_ms = get_parameter(DURATION_MS, self.parameters)
@@ -84,16 +82,16 @@ def __init__(self, parameters, sigan, gps=None):
def execute(self, schedule_entry: dict, task_id: int) -> dict:
# Use the sigan's actual reported instead of requested sample rate
- sample_rate = self.sigan.sample_rate
+ sample_rate = self.sensor.signal_analyzer.sample_rate
num_samples = int(sample_rate * self.duration_ms * 1e-3)
measurement_result = self.acquire_data(num_samples, self.nskip, self.cal_adjust)
end_time = utils.get_datetime_str_now()
measurement_result.update(self.parameters)
measurement_result["end_time"] = end_time
measurement_result["task_id"] = task_id
- measurement_result["calibration_datetime"] = self.sigan.sensor_calibration_data[
- "datetime"
- ]
+ measurement_result[
+ "calibration_datetime"
+ ] = self.sensor.signal_analyzer.sensor_calibration_data["datetime"]
measurement_result["classification"] = self.classification
sigan_settings = self.get_sigan_settings(measurement_result)
logger.debug(f"sigan settings:{sigan_settings}")
diff --git a/scos_actions/actions/acquire_stepped_freq_tdomain_iq.py b/scos_actions/actions/acquire_stepped_freq_tdomain_iq.py
index 314b245f..495be7e2 100644
--- a/scos_actions/actions/acquire_stepped_freq_tdomain_iq.py
+++ b/scos_actions/actions/acquire_stepped_freq_tdomain_iq.py
@@ -38,8 +38,6 @@
import logging
import numpy as np
-
-from scos_actions import utils
from scos_actions.actions.acquire_single_freq_tdomain_iq import (
CAL_ADJUST,
DURATION_MS,
@@ -47,12 +45,14 @@
NUM_SKIP,
SingleFrequencyTimeDomainIqAcquisition,
)
-from scos_actions.hardware.mocks.mock_gps import MockGPS
+from scos_actions.hardware.sensor import Sensor
from scos_actions.metadata.structs import ntia_sensor
from scos_actions.metadata.structs.capture import CaptureSegment
from scos_actions.signals import measurement_action_completed
from scos_actions.utils import get_parameter
+from scos_actions import utils
+
logger = logging.getLogger(__name__)
@@ -77,20 +77,16 @@ class SteppedFrequencyTimeDomainIqAcquisition(SingleFrequencyTimeDomainIqAcquisi
:param sigan: instance of SignalAnalyzerInterface
"""
- def __init__(self, parameters, sigan, gps=None):
- if gps is None:
- gps = MockGPS()
- super().__init__(parameters=parameters, sigan=sigan, gps=gps)
+ def __init__(self, parameters: dict):
+ super().__init__(parameters=parameters)
num_center_frequencies = len(parameters[FREQUENCY])
-
# Create iterable parameter set
self.iterable_params = utils.get_iterable_parameters(parameters)
-
- self.sigan = sigan # make instance variable to allow mocking
self.num_center_frequencies = num_center_frequencies
- def __call__(self, schedule_entry: dict, task_id: int):
+ def __call__(self, sensor: Sensor, schedule_entry: dict, task_id: int):
"""This is the entrypoint function called by the scheduler."""
+ self._sensor = sensor
self.test_required_components()
saved_samples = 0
@@ -102,7 +98,7 @@ def __call__(self, schedule_entry: dict, task_id: int):
duration_ms = get_parameter(DURATION_MS, measurement_params)
nskip = get_parameter(NUM_SKIP, measurement_params)
cal_adjust = get_parameter(CAL_ADJUST, measurement_params)
- sample_rate = self.sigan.sample_rate
+ sample_rate = self.sensor.signal_analyzer.sample_rate
num_samples = int(sample_rate * duration_ms * 1e-3)
measurement_result = super().acquire_data(num_samples, nskip, cal_adjust)
measurement_result.update(measurement_params)
@@ -121,8 +117,8 @@ def __call__(self, schedule_entry: dict, task_id: int):
overload=measurement_result["overload"],
sigan_settings=sigan_settings,
)
- sigan_cal = self.sigan.sigan_calibration_data
- sensor_cal = self.sigan.sensor_calibration_data
+ sigan_cal = self.sensor.signal_analyzer.sigan_calibration_data
+ sensor_cal = self.sensor.signal_analyzer.sensor_calibration_data
if sigan_cal is not None:
if "1db_compression_point" in sigan_cal:
sigan_cal["compression_point"] = sigan_cal.pop(
diff --git a/scos_actions/actions/calibrate_y_factor.py b/scos_actions/actions/calibrate_y_factor.py
index bc975e65..d26bb743 100644
--- a/scos_actions/actions/calibrate_y_factor.py
+++ b/scos_actions/actions/calibrate_y_factor.py
@@ -75,13 +75,9 @@
import numpy as np
from scipy.constants import Boltzmann
from scipy.signal import sosfilt
-
-from scos_actions import utils
from scos_actions.actions.interfaces.action import Action
-from scos_actions.calibration import sensor_calibration, default_sensor_calibration
-from scos_actions.hardware.mocks.mock_gps import MockGPS
+from scos_actions.hardware.sensor import Sensor
from scos_actions.hardware.sigan_iface import SIGAN_SETTINGS_KEYS
-from scos_actions.settings import SENSOR_CALIBRATION_FILE
from scos_actions.signal_processing.calibration import (
get_linear_enr,
get_temperature,
@@ -91,14 +87,13 @@
generate_elliptic_iir_low_pass_filter,
get_iir_enbw,
)
-from scos_actions.signal_processing.power_analysis import (
- calculate_power_watts,
- create_statistical_detector,
-)
+from scos_actions.signal_processing.power_analysis import calculate_power_watts
from scos_actions.signal_processing.unit_conversion import convert_watts_to_dBm
from scos_actions.signals import trigger_api_restart
from scos_actions.utils import ParameterException, get_parameter
+from scos_actions import utils
+
logger = logging.getLogger(__name__)
# Define parameter keys
@@ -140,12 +135,9 @@ class YFactorCalibration(Action):
:param sigan: instance of SignalAnalyzerInterface.
"""
- def __init__(self, parameters, sigan, gps=None):
- if gps is None:
- gps = MockGPS()
+ def __init__(self, parameters: dict):
logger.debug("Initializing calibration action")
- super().__init__(parameters, sigan, gps)
- self.sigan = sigan
+ super().__init__(parameters)
self.iteration_params = utils.get_iterable_parameters(parameters)
# IIR Filter Setup
@@ -201,8 +193,9 @@ def __init__(self, parameters, sigan, gps=None):
"Only one set of IIR filter parameters may be specified (including sample rate)."
)
- def __call__(self, schedule_entry: dict, task_id: int):
+ def __call__(self, sensor: Sensor, schedule_entry: dict, task_id: int):
"""This is the entrypoint function called by the scheduler."""
+ self.sensor = sensor
self.test_required_components()
detail = ""
@@ -214,7 +207,7 @@ def __call__(self, schedule_entry: dict, task_id: int):
detail += os.linesep + self.calibrate(p)
return detail
- def calibrate(self, params):
+ def calibrate(self, params: dict):
# Configure signal analyzer
self.configure_sigan(params)
@@ -230,25 +223,29 @@ def calibrate(self, params):
# Set noise diode on
logger.debug("Setting noise diode on")
- self.configure_preselector({RF_PATH: nd_on_state})
+ self.configure_preselector(params={RF_PATH: nd_on_state})
time.sleep(0.25)
# Get noise diode on IQ
logger.debug("Acquiring IQ samples with noise diode ON")
- noise_on_measurement_result = self.sigan.acquire_time_domain_samples(
- num_samples, num_samples_skip=nskip, cal_adjust=False
+ noise_on_measurement_result = (
+ self.sensor.signal_analyzer.acquire_time_domain_samples(
+ num_samples, num_samples_skip=nskip, cal_adjust=False
+ )
)
sample_rate = noise_on_measurement_result["sample_rate"]
# Set noise diode off
logger.debug("Setting noise diode off")
- self.configure_preselector({RF_PATH: nd_off_state})
+ self.configure_preselector(params={RF_PATH: nd_off_state})
time.sleep(0.25)
# Get noise diode off IQ
logger.debug("Acquiring IQ samples with noise diode OFF")
- noise_off_measurement_result = self.sigan.acquire_time_domain_samples(
- num_samples, num_samples_skip=nskip, cal_adjust=False
+ noise_off_measurement_result = (
+ self.sensor.signal_analyzer.acquire_time_domain_samples(
+ num_samples, num_samples_skip=nskip, cal_adjust=False
+ )
)
assert (
sample_rate == noise_off_measurement_result["sample_rate"]
@@ -262,21 +259,24 @@ def calibrate(self, params):
noise_on_data = sosfilt(self.iir_sos, noise_on_measurement_result["data"])
noise_off_data = sosfilt(self.iir_sos, noise_off_measurement_result["data"])
else:
- if default_sensor_calibration:
+ if self.sensor.signal_analyzer.sensor_calibration.is_default:
raise Exception(
"Calibrations without IIR filter cannot be performed with default calibration."
)
logger.debug("Skipping IIR filtering")
# Get ENBW from sensor calibration
- assert set(sensor_calibration.calibration_parameters) <= set(
+ assert set(
+ self.sensor.signal_analyzer.sensor_calibration.calibration_parameters
+ ) <= set(
sigan_params.keys()
), f"Action parameters do not include all required calibration parameters"
cal_args = [
- sigan_params[k] for k in sensor_calibration.calibration_parameters
+ sigan_params[k]
+ for k in self.sensor.signal_analyzer.sensor_calibration.calibration_parameters
]
- self.sigan.recompute_sensor_calibration_data(cal_args)
- enbw_hz = self.sigan.sensor_calibration_data["enbw"]
+ self.sensor.signal_analyzer.recompute_sensor_calibration_data(cal_args)
+ enbw_hz = self.sensor.signal_analyzer.sensor_calibration_data["enbw"]
noise_on_data = noise_on_measurement_result["data"]
noise_off_data = noise_off_measurement_result["data"]
@@ -285,20 +285,17 @@ def calibrate(self, params):
pwr_off_watts = calculate_power_watts(noise_off_data) / 2.0
# Y-Factor
- enr_linear = get_linear_enr(cal_source_idx)
- temp_k, temp_c, _ = get_temperature(temp_sensor_idx)
+ enr_linear = get_linear_enr(
+ preselector=self.sensor.preselector, cal_source_idx=cal_source_idx
+ )
+ temp_k, temp_c, _ = get_temperature(self.sensor.preselector, temp_sensor_idx)
noise_figure, gain = y_factor(
pwr_on_watts, pwr_off_watts, enr_linear, enbw_hz, temp_k
)
# Update sensor calibration with results
- sensor_calibration.update(
- sigan_params,
- utils.get_datetime_str_now(),
- gain,
- noise_figure,
- temp_c,
- SENSOR_CALIBRATION_FILE,
+ self.sensor.signal_analyzer.sensor_calibration.update(
+ sigan_params, utils.get_datetime_str_now(), gain, noise_figure, temp_c
)
# Debugging
@@ -382,9 +379,9 @@ def description(self):
def test_required_components(self):
"""Fail acquisition if a required component is not available."""
- if not self.sigan.is_available:
+ if not self.sensor.signal_analyzer.is_available:
msg = "acquisition failed: signal analyzer required but not available"
trigger_api_restart.send(sender=self.__class__)
raise RuntimeError(msg)
- if not self.sigan.healthy():
+ if not self.sensor.signal_analyzer.healthy():
trigger_api_restart.send(sender=self.__class__)
diff --git a/scos_actions/actions/interfaces/action.py b/scos_actions/actions/interfaces/action.py
index b3172374..496723dd 100644
--- a/scos_actions/actions/interfaces/action.py
+++ b/scos_actions/actions/interfaces/action.py
@@ -1,10 +1,9 @@
import logging
from abc import ABC, abstractmethod
from copy import deepcopy
+from typing import Optional
-from scos_actions.capabilities import SENSOR_LOCATION, capabilities
-from scos_actions.hardware import preselector
-from scos_actions.hardware.mocks.mock_gps import MockGPS
+from scos_actions.hardware.sensor import Sensor
from scos_actions.hardware.sigan_iface import SIGAN_SETTINGS_KEYS
from scos_actions.metadata.sigmf_builder import SigMFBuilder
from scos_actions.metadata.structs import ntia_scos, ntia_sensor
@@ -34,41 +33,39 @@ class Action(ABC):
PRESELECTOR_PATH_KEY = "rf_path"
- def __init__(self, parameters, sigan, gps=None):
- if gps is None:
- gps = MockGPS()
+ def __init__(self, parameters: dict):
+ self._sensor = None
self.parameters = deepcopy(parameters)
- self.sigan = sigan
- self.gps = gps
- self.sensor_definition = capabilities["sensor"]
self.sigmf_builder = None
- if (
- "preselector" in self.sensor_definition
- and "rf_paths" in self.sensor_definition["preselector"]
- ):
- self.has_configurable_preselector = True
- else:
- self.has_configurable_preselector = False
def configure(self, params: dict):
self.configure_sigan(params)
self.configure_preselector(params)
+ @property
+ def sensor(self):
+ return self._sensor
+
+ @sensor.setter
+ def sensor(self, value: Sensor):
+ self._sensor = value
+
def configure_sigan(self, params: dict):
sigan_params = {k: v for k, v in params.items() if k in SIGAN_SETTINGS_KEYS}
for key, value in sigan_params.items():
- if hasattr(self.sigan, key):
+ if hasattr(self.sensor.signal_analyzer, key):
logger.debug(f"Applying setting to sigan: {key}: {value}")
- setattr(self.sigan, key, value)
+ setattr(self.sensor.signal_analyzer, key, value)
else:
logger.warning(f"Sigan does not have attribute {key}")
def configure_preselector(self, params: dict):
+ preselector = self.sensor.preselector
if self.PRESELECTOR_PATH_KEY in params:
path = params[self.PRESELECTOR_PATH_KEY]
logger.debug(f"Setting preselector RF path: {path}")
preselector.set_state(path)
- elif self.has_configurable_preselector:
+ elif self.sensor.has_configurable_preselector:
# Require the RF path to be specified if the sensor has a preselector.
raise ParameterException(
f"No {self.PRESELECTOR_PATH_KEY} value specified in the YAML config."
@@ -104,9 +101,14 @@ def get_sigmf_builder(self, schedule_entry: dict) -> None:
)
sigmf_builder.set_action(action_obj)
- if SENSOR_LOCATION is not None:
- sigmf_builder.set_geolocation(SENSOR_LOCATION)
- sigmf_builder.set_sensor(ntia_sensor.Sensor(**self.sensor_definition))
+ if self.sensor.location is not None:
+ sigmf_builder.set_geolocation(self.sensor.location)
+ if self.sensor.capabilities is not None and hasattr(
+ self.sensor.capabilities, "sensor"
+ ):
+ sigmf_builder.set_sensor(
+ ntia_sensor.Sensor(**self.sensor.capabilities["sensor"])
+ )
self.sigmf_builder = sigmf_builder
@@ -126,5 +128,10 @@ def name(self):
return get_parameter("name", self.parameters)
@abstractmethod
- def __call__(self, schedule_entry, task_id):
+ def __call__(
+ self,
+ sensor: Sensor = None,
+ schedule_entry: Optional[dict] = None,
+ task_id: Optional[int] = None,
+ ):
pass
diff --git a/scos_actions/actions/interfaces/measurement_action.py b/scos_actions/actions/interfaces/measurement_action.py
index 8acc224f..f0b06e54 100644
--- a/scos_actions/actions/interfaces/measurement_action.py
+++ b/scos_actions/actions/interfaces/measurement_action.py
@@ -1,11 +1,10 @@
import logging
from abc import abstractmethod
-from typing import Union
+from typing import Optional
import numpy as np
-
from scos_actions.actions.interfaces.action import Action
-from scos_actions.hardware.mocks.mock_gps import MockGPS
+from scos_actions.hardware.sensor import Sensor
from scos_actions.metadata.structs import ntia_sensor
from scos_actions.metadata.structs.capture import CaptureSegment
from scos_actions.signals import measurement_action_completed
@@ -21,17 +20,16 @@ class MeasurementAction(Action):
"""
- def __init__(self, parameters, sigan, gps=None):
- if gps is None:
- gps = MockGPS()
- super().__init__(parameters, sigan, gps)
+ def __init__(self, parameters: dict):
+ super().__init__(parameters)
self.received_samples = 0
- def __call__(self, schedule_entry: dict, task_id: int):
+ def __call__(self, sensor: Sensor, schedule_entry: dict, task_id: int):
+ self._sensor = sensor
+ self.get_sigmf_builder(schedule_entry)
self.test_required_components()
self.configure(self.parameters)
measurement_result = self.execute(schedule_entry, task_id)
- self.get_sigmf_builder(schedule_entry) # Initializes SigMFBuilder
self.create_metadata(measurement_result) # Fill metadata
data = self.transform_data(measurement_result)
self.send_signals(task_id, self.sigmf_builder.metadata, data)
@@ -43,7 +41,7 @@ def create_capture_segment(
center_frequency_Hz: float,
duration_ms: int,
overload: bool,
- sigan_settings: Union[ntia_sensor.SiganSettings, None],
+ sigan_settings: Optional[ntia_sensor.SiganSettings],
) -> CaptureSegment:
capture_segment = CaptureSegment(
sample_start=sample_start,
@@ -53,8 +51,8 @@ def create_capture_segment(
overload=overload,
sigan_settings=sigan_settings,
)
- sigan_cal = self.sigan.sigan_calibration_data
- sensor_cal = self.sigan.sensor_calibration_data
+ sigan_cal = self.sensor.signal_analyzer.sigan_calibration_data
+ sensor_cal = self.sensor.signal_analyzer.sensor_calibration_data
# Rename compression point keys if they exist
# then set calibration metadata if it exists
if sensor_cal is not None:
@@ -72,7 +70,7 @@ def create_capture_segment(
def create_metadata(
self,
measurement_result: dict,
- recording: int = None,
+ recording: Optional[int] = None,
) -> None:
"""Add SigMF metadata to the `sigmf_builder` from the `measurement_result`."""
# Set the received_samples instance variable
@@ -128,7 +126,7 @@ def create_metadata(
def get_sigan_settings(
self, measurement_result: dict
- ) -> Union[ntia_sensor.SiganSettings, None]:
+ ) -> Optional[ntia_sensor.SiganSettings]:
"""
Retrieve any sigan settings from the measurement result dict, and return
a `ntia-sensor` `SiganSettings` object. Values are pulled from the
@@ -148,7 +146,7 @@ def get_sigan_settings(
def test_required_components(self):
"""Fail acquisition if a required component is not available."""
- if not self.sigan.is_available:
+ if not self.sensor.signal_analyzer.is_available:
msg = "acquisition failed: signal analyzer required but not available"
raise RuntimeError(msg)
@@ -168,7 +166,7 @@ def acquire_data(
+ f" and {'' if cal_adjust else 'not '}applying gain adjustment based"
+ " on calibration data"
)
- measurement_result = self.sigan.acquire_time_domain_samples(
+ measurement_result = self.sensor.signal_analyzer.acquire_time_domain_samples(
num_samples,
num_samples_skip=nskip,
cal_adjust=cal_adjust,
diff --git a/scos_actions/actions/logger.py b/scos_actions/actions/logger.py
index 5966235c..ae2c6e60 100644
--- a/scos_actions/actions/logger.py
+++ b/scos_actions/actions/logger.py
@@ -1,8 +1,10 @@
"""A simple example action that logs a message."""
import logging
+from typing import Optional
from scos_actions.actions.interfaces.action import Action
+from scos_actions.hardware.sensor import Sensor
logger = logging.getLogger(__name__)
@@ -21,10 +23,10 @@ class Logger(Action):
"""
def __init__(self, loglvl=LOGLVL_INFO):
- super().__init__(parameters={"name": "logger"}, sigan=None, gps=None)
+ super().__init__(parameters={"name": "logger"})
self.loglvl = loglvl
- def __call__(self, schedule_entry, task_id):
+ def __call__(self, sensor: Optional[Sensor], schedule_entry: dict, task_id: int):
msg = "running test {name}/{tid}"
schedule_entry_name = schedule_entry["name"]
logger.log(
diff --git a/scos_actions/actions/monitor_sigan.py b/scos_actions/actions/monitor_sigan.py
index 55427ac3..5153fd8f 100644
--- a/scos_actions/actions/monitor_sigan.py
+++ b/scos_actions/actions/monitor_sigan.py
@@ -1,9 +1,10 @@
"""Monitor the signal analyzer."""
import logging
+from typing import Optional
from scos_actions.actions.interfaces.action import Action
-from scos_actions.hardware.mocks.mock_gps import MockGPS
+from scos_actions.hardware.sensor import Sensor
from scos_actions.signals import trigger_api_restart
logger = logging.getLogger(__name__)
@@ -12,15 +13,18 @@
class MonitorSignalAnalyzer(Action):
"""Monitor signal analyzer connection and restart container if unreachable."""
- def __init__(self, sigan, parameters={"name": "monitor_sigan"}, gps=None):
- if gps is None:
- gps = MockGPS()
- super().__init__(parameters=parameters, sigan=sigan, gps=gps)
+ def __init__(self, parameters={"name": "monitor_sigan"}):
+ super().__init__(parameters=parameters)
- def __call__(self, schedule_entry: dict, task_id: int):
+ def __call__(
+ self,
+ sensor: Sensor,
+ schedule_entry: Optional[dict] = None,
+ task_id: Optional[int] = None,
+ ):
logger.debug("Performing signal analyzer health check")
-
- healthy = self.sigan.healthy()
+ self._sensor = sensor
+ healthy = self.sensor.signal_analyzer.healthy()
if healthy:
logger.info("signal analyzer healthy")
diff --git a/scos_actions/actions/sync_gps.py b/scos_actions/actions/sync_gps.py
index 5444bcb9..b0f88543 100644
--- a/scos_actions/actions/sync_gps.py
+++ b/scos_actions/actions/sync_gps.py
@@ -4,6 +4,7 @@
import subprocess
from scos_actions.actions.interfaces.action import Action
+from scos_actions.hardware.sensor import Sensor
from scos_actions.signals import location_action_completed
logger = logging.getLogger(__name__)
@@ -12,18 +13,18 @@
class SyncGps(Action):
"""Query the GPS and synchronize time and location."""
- def __init__(self, gps, parameters, sigan):
- super().__init__(parameters=parameters, sigan=sigan, gps=gps)
+ def __init__(self, parameters: dict):
+ super().__init__(parameters=parameters)
- def __call__(self, schedule_entry: dict, task_id: int):
+ def __call__(self, sensor: Sensor, schedule_entry: dict, task_id: int):
logger.debug("Syncing to GPS")
-
- dt = self.gps.get_gps_time()
+ self.sensor = sensor
+ dt = self.sensor.gps.get_gps_time()
date_cmd = ["date", "-s", "{:}".format(dt.strftime("%Y/%m/%d %H:%M:%S"))]
subprocess.check_output(date_cmd, shell=True)
logger.info(f"Set system time to GPS time {dt.ctime()}")
- location = self.gps.get_location()
+ location = sensor.gps.get_location()
if location is None:
raise RuntimeError("Unable to synchronize to GPS")
diff --git a/scos_actions/actions/tests/test_acquire_single_freq_fft.py b/scos_actions/actions/tests/test_acquire_single_freq_fft.py
index e21cb4e4..0419d7cc 100644
--- a/scos_actions/actions/tests/test_acquire_single_freq_fft.py
+++ b/scos_actions/actions/tests/test_acquire_single_freq_fft.py
@@ -1,6 +1,7 @@
-from scos_actions.actions.tests.utils import SENSOR_DEFINITION, check_metadata_fields
-from scos_actions.capabilities import capabilities
+from scos_actions.actions.tests.utils import check_metadata_fields
from scos_actions.discover import test_actions as actions
+from scos_actions.hardware.mocks.mock_sigan import MockSignalAnalyzer
+from scos_actions.hardware.sensor import Sensor
from scos_actions.signals import measurement_action_completed
SINGLE_FREQUENCY_FFT_ACQUISITION = {
@@ -10,7 +11,6 @@
"interval": None,
"action": "test_single_frequency_m4s_action",
}
-capabilities["sensor"] = SENSOR_DEFINITION
def test_detector():
@@ -29,7 +29,12 @@ def callback(sender, **kwargs):
measurement_action_completed.connect(callback)
action = actions["test_single_frequency_m4s_action"]
assert action.description
- action(SINGLE_FREQUENCY_FFT_ACQUISITION, 1)
+ sensor = Sensor(signal_analyzer=MockSignalAnalyzer(), capabilities={})
+ action(
+ sensor=sensor,
+ schedule_entry=SINGLE_FREQUENCY_FFT_ACQUISITION,
+ task_id=1,
+ )
assert _task_id
assert _data.any()
assert _metadata
@@ -84,5 +89,6 @@ def callback(sender, **kwargs):
def test_num_samples_skip():
action = actions["test_single_frequency_m4s_action"]
assert action.description
- action(SINGLE_FREQUENCY_FFT_ACQUISITION, 1)
- assert action.sigan._num_samples_skip == action.parameters["nskip"]
+ sensor = Sensor(signal_analyzer=MockSignalAnalyzer(), capabilities={})
+ action(sensor, SINGLE_FREQUENCY_FFT_ACQUISITION, 1)
+ assert action.sensor.signal_analyzer._num_samples_skip == action.parameters["nskip"]
diff --git a/scos_actions/actions/tests/test_monitor_sigan.py b/scos_actions/actions/tests/test_monitor_sigan.py
index 3cd2e737..9a00ea71 100644
--- a/scos_actions/actions/tests/test_monitor_sigan.py
+++ b/scos_actions/actions/tests/test_monitor_sigan.py
@@ -1,4 +1,6 @@
from scos_actions.discover import test_actions as actions
+from scos_actions.hardware.mocks.mock_sigan import MockSignalAnalyzer
+from scos_actions.hardware.sensor import Sensor
from scos_actions.signals import trigger_api_restart
MONITOR_SIGAN_SCHEDULE = {
@@ -19,11 +21,12 @@ def callback(sender, **kwargs):
trigger_api_restart.connect(callback)
action = actions["test_monitor_sigan"]
- sigan = action.sigan
- sigan._is_available = False
- action(MONITOR_SIGAN_SCHEDULE, 1)
+ mock_sigan = MockSignalAnalyzer()
+ mock_sigan._is_available = False
+ sensor = Sensor(signal_analyzer=mock_sigan, capabilities={})
+ action(sensor, MONITOR_SIGAN_SCHEDULE, 1)
assert _api_restart_triggered == True # signal sent
- sigan._is_available = True
+ mock_sigan._is_available = True
def test_monitor_sigan_not_healthy():
@@ -35,9 +38,10 @@ def callback(sender, **kwargs):
trigger_api_restart.connect(callback)
action = actions["test_monitor_sigan"]
- sigan = action.sigan
- sigan.times_to_fail_recv = 6
- action(MONITOR_SIGAN_SCHEDULE, 1)
+ mock_sigan = MockSignalAnalyzer()
+ mock_sigan.times_to_fail_recv = 6
+ sensor = Sensor(signal_analyzer=mock_sigan, capabilities={})
+ action(sensor, MONITOR_SIGAN_SCHEDULE, 1)
assert _api_restart_triggered == True # signal sent
@@ -50,8 +54,9 @@ def callback(sender, **kwargs):
trigger_api_restart.connect(callback)
action = actions["test_monitor_sigan"]
- sigan = action.sigan
- sigan._is_available = True
- sigan.set_times_to_fail_recv(0)
- action(MONITOR_SIGAN_SCHEDULE, 1)
+ mock_sigan = MockSignalAnalyzer()
+ mock_sigan._is_available = True
+ mock_sigan.set_times_to_fail_recv(0)
+ sensor = Sensor(signal_analyzer=mock_sigan, capabilities={})
+ action(sensor, MONITOR_SIGAN_SCHEDULE, 1)
assert _api_restart_triggered == False # signal not sent
diff --git a/scos_actions/actions/tests/test_single_freq_tdomain_iq.py b/scos_actions/actions/tests/test_single_freq_tdomain_iq.py
index fc388fd2..67d86495 100644
--- a/scos_actions/actions/tests/test_single_freq_tdomain_iq.py
+++ b/scos_actions/actions/tests/test_single_freq_tdomain_iq.py
@@ -2,6 +2,8 @@
from scos_actions.actions.tests.utils import check_metadata_fields
from scos_actions.discover import test_actions as actions
+from scos_actions.hardware.mocks.mock_sigan import MockSignalAnalyzer
+from scos_actions.hardware.sensor import Sensor
from scos_actions.signals import measurement_action_completed
SINGLE_TIMEDOMAIN_IQ_ACQUISITION = {
@@ -29,7 +31,8 @@ def callback(sender, **kwargs):
measurement_action_completed.connect(callback)
action = actions["test_single_frequency_iq_action"]
assert action.description
- action(SINGLE_TIMEDOMAIN_IQ_ACQUISITION, 1)
+ sensor = Sensor(signal_analyzer=MockSignalAnalyzer(), capabilities={})
+ action(sensor, SINGLE_TIMEDOMAIN_IQ_ACQUISITION, 1)
assert _data.any()
assert _metadata
assert _task_id == 1
@@ -57,15 +60,18 @@ def callback(sender, **kwargs):
def test_required_components():
action = actions["test_single_frequency_m4s_action"]
- sigan = action.sigan
- sigan._is_available = False
+ mock_sigan = MockSignalAnalyzer()
+ mock_sigan._is_available = False
+ sensor = Sensor(signal_analyzer=mock_sigan, capabilities={})
with pytest.raises(RuntimeError):
- action(SINGLE_TIMEDOMAIN_IQ_ACQUISITION, 1)
- sigan._is_available = True
+ action(sensor, SINGLE_TIMEDOMAIN_IQ_ACQUISITION, 1)
+ mock_sigan._is_available = True
def test_num_samples_skip():
action = actions["test_single_frequency_iq_action"]
assert action.description
- action(SINGLE_TIMEDOMAIN_IQ_ACQUISITION, 1)
- assert action.sigan._num_samples_skip == action.parameters["nskip"]
+ mock_sigan = MockSignalAnalyzer()
+ sensor = Sensor(signal_analyzer=mock_sigan, capabilities={})
+ action(sensor, SINGLE_TIMEDOMAIN_IQ_ACQUISITION, 1)
+ assert action.sensor.signal_analyzer._num_samples_skip == action.parameters["nskip"]
diff --git a/scos_actions/actions/tests/test_stepped_freq_tdomain_iq.py b/scos_actions/actions/tests/test_stepped_freq_tdomain_iq.py
index 6ef6881f..e06920cc 100644
--- a/scos_actions/actions/tests/test_stepped_freq_tdomain_iq.py
+++ b/scos_actions/actions/tests/test_stepped_freq_tdomain_iq.py
@@ -1,4 +1,6 @@
from scos_actions.discover import test_actions as actions
+from scos_actions.hardware.mocks.mock_sigan import MockSignalAnalyzer
+from scos_actions.hardware.sensor import Sensor
from scos_actions.signals import measurement_action_completed
SINGLE_TIMEDOMAIN_IQ_MULTI_RECORDING_ACQUISITION = {
@@ -32,7 +34,9 @@ def callback(sender, **kwargs):
measurement_action_completed.connect(callback)
action = actions["test_multi_frequency_iq_action"]
assert action.description
- action(SINGLE_TIMEDOMAIN_IQ_MULTI_RECORDING_ACQUISITION, 1)
+ mock_sigan = MockSignalAnalyzer()
+ sensor = Sensor(signal_analyzer=mock_sigan, capabilities={})
+ action(sensor, SINGLE_TIMEDOMAIN_IQ_MULTI_RECORDING_ACQUISITION, 1)
for i in range(_count):
assert _datas[i].any()
assert _metadatas[i]
@@ -44,8 +48,16 @@ def callback(sender, **kwargs):
def test_num_samples_skip():
action = actions["test_multi_frequency_iq_action"]
assert action.description
- action(SINGLE_TIMEDOMAIN_IQ_MULTI_RECORDING_ACQUISITION, 1)
+ mock_sigan = MockSignalAnalyzer()
+ sensor = Sensor(signal_analyzer=mock_sigan, capabilities={})
+ action(sensor, SINGLE_TIMEDOMAIN_IQ_MULTI_RECORDING_ACQUISITION, 1)
if isinstance(action.parameters["nskip"], list):
- assert action.sigan._num_samples_skip == action.parameters["nskip"][-1]
+ assert (
+ action.sensor.signal_analyzer._num_samples_skip
+ == action.parameters["nskip"][-1]
+ )
else:
- assert action.sigan._num_samples_skip == action.parameters["nskip"]
+ assert (
+ action.sensor.signal_analyzer._num_samples_skip
+ == action.parameters["nskip"]
+ )
diff --git a/scos_actions/actions/tests/test_sync_gps.py b/scos_actions/actions/tests/test_sync_gps.py
index f60d6b76..ed7b2691 100644
--- a/scos_actions/actions/tests/test_sync_gps.py
+++ b/scos_actions/actions/tests/test_sync_gps.py
@@ -4,6 +4,9 @@
import pytest
from scos_actions.discover import test_actions
+from scos_actions.hardware.mocks.mock_gps import MockGPS
+from scos_actions.hardware.mocks.mock_sigan import MockSignalAnalyzer
+from scos_actions.hardware.sensor import Sensor
from scos_actions.signals import location_action_completed
SYNC_GPS = {
@@ -15,7 +18,7 @@
}
-def test_detector():
+def test_location_action_completed():
_latitude = None
_longitude = None
@@ -27,12 +30,15 @@ def callback(sender, **kwargs):
location_action_completed.connect(callback)
action = test_actions["test_sync_gps"]
+ sensor = Sensor(
+ signal_analyzer=MockSignalAnalyzer(), capabilities={}, gps=MockGPS()
+ )
if sys.platform == "linux":
- action(SYNC_GPS, 1)
+ action(sensor, SYNC_GPS, 1)
assert _latitude
assert _longitude
elif sys.platform == "win32":
with pytest.raises(subprocess.CalledProcessError):
- action(SYNC_GPS, 1)
+ action(sensor, SYNC_GPS, 1)
else:
raise NotImplementedError("Test not implemented for current OS.")
diff --git a/scos_actions/actions/tests/utils.py b/scos_actions/actions/tests/utils.py
index da758289..9b8f0c61 100644
--- a/scos_actions/actions/tests/utils.py
+++ b/scos_actions/actions/tests/utils.py
@@ -1,13 +1,5 @@
from sigmf.validate import validate as sigmf_validate
-SENSOR_DEFINITION = {
- "id": "",
- "sensor_spec": {"id": "", "model": "greyhound"},
- "antenna": {"antenna_spec": {"id": "", "model": "L-com HG3512UP-NF"}},
- "signal_analyzer": {"sigan_spec": {"id": "", "model": "Ettus USRP B210"}},
- "computer_spec": {"id": "", "model": "Intel NUC"},
-}
-
def check_metadata_fields(
metadata, action, entry_name, action_name, task_id, recording=None
diff --git a/scos_actions/calibration/__init__.py b/scos_actions/calibration/__init__.py
index e78f4c7e..e69de29b 100644
--- a/scos_actions/calibration/__init__.py
+++ b/scos_actions/calibration/__init__.py
@@ -1,90 +0,0 @@
-import logging
-from os import path
-
-from scos_actions.calibration.calibration import Calibration, load_from_json
-from scos_actions.settings import (
- DEFAULT_CALIBRATION_FILE,
- SENSOR_CALIBRATION_FILE,
- SIGAN_CALIBRATION_FILE,
-)
-
-logger = logging.getLogger(__name__)
-
-
-def get_sigan_calibration(sigan_cal_file: str) -> Calibration:
- """
- Load signal analyzer calibration data from file.
-
- :param sigan_cal_file: Path to JSON file containing signal
- analyzer calibration data.
- :return: The signal analyzer ``Calibration`` object.
- """
- try:
- sigan_cal = load_from_json(sigan_cal_file)
- except Exception:
- sigan_cal = None
- logger.exception("Unable to load sigan calibration data, reverting to none")
- return sigan_cal
-
-
-def get_sensor_calibration(sensor_cal_file: str) -> Calibration:
- """
- Load sensor calibration data from file.
-
- :param sensor_cal_file: Path to JSON file containing sensor
- calibration data.
- :return: The sensor ``Calibration`` object.
- """
- try:
- sensor_cal = load_from_json(sensor_cal_file)
- except Exception:
- sensor_cal = None
- logger.exception("Unable to load sensor calibration data, reverting to none")
- return sensor_cal
-
-
-def check_for_default_calibration(cal_file_path: str, cal_type: str) -> bool:
- default_cal = False
- if cal_file_path == DEFAULT_CALIBRATION_FILE:
- default_cal = True
- logger.warning(
- f"***************LOADING DEFAULT {cal_type} CALIBRATION***************"
- )
- return default_cal
-
-
-sensor_calibration = None
-if SENSOR_CALIBRATION_FILE is None or SENSOR_CALIBRATION_FILE == "":
- logger.warning(
- "No sensor calibration file specified. Not loading calibration file."
- )
-elif not path.exists(SENSOR_CALIBRATION_FILE):
- logger.warning(
- SENSOR_CALIBRATION_FILE
- + " does not exist. Not loading sensor calibration file."
- )
-else:
- logger.debug(f"Loading sensor cal file: {SENSOR_CALIBRATION_FILE}")
- default_sensor_calibration = check_for_default_calibration(
- SENSOR_CALIBRATION_FILE, "Sensor"
- )
- sensor_calibration = get_sensor_calibration(SENSOR_CALIBRATION_FILE)
-
-sigan_calibration = None
-default_sensor_calibration = False
-default_sigan_calibration = False
-if SIGAN_CALIBRATION_FILE is None or SIGAN_CALIBRATION_FILE == "":
- logger.warning("No sigan calibration file specified. Not loading calibration file.")
-elif not path.exists(SIGAN_CALIBRATION_FILE):
- logger.warning(
- SIGAN_CALIBRATION_FILE + " does not exist. Not loading sigan calibration file."
- )
-else:
- logger.debug(f"Loading sigan cal file: {SIGAN_CALIBRATION_FILE}")
- default_sigan_calibration = check_for_default_calibration(
- SIGAN_CALIBRATION_FILE, "Sigan"
- )
- sigan_calibration = get_sigan_calibration(SIGAN_CALIBRATION_FILE)
-
-if sensor_calibration:
- logger.debug(f"Last sensor cal: {sensor_calibration.last_calibration_datetime}")
diff --git a/scos_actions/calibration/calibration.py b/scos_actions/calibration/calibration.py
index 9d724af7..2506d825 100644
--- a/scos_actions/calibration/calibration.py
+++ b/scos_actions/calibration/calibration.py
@@ -15,6 +15,8 @@ class Calibration:
calibration_parameters: List[str]
calibration_data: dict
clock_rate_lookup_by_sample_rate: List[Dict[str, float]]
+ is_default: bool
+ file_path: Path
def __post_init__(self):
# Convert key names in calibration_data to strings
@@ -55,7 +57,6 @@ def update(
gain_dB: float,
noise_figure_dB: float,
temp_degC: float,
- file_path: Path,
) -> None:
"""
Update the calibration data by overwriting or adding an entry.
@@ -116,11 +117,11 @@ def update(
"clock_rate_lookup_by_sample_rate": self.clock_rate_lookup_by_sample_rate,
"calibration_data": self.calibration_data,
}
- with open(file_path, "w") as outfile:
+ with open(self.file_path, "w") as outfile:
outfile.write(json.dumps(cal_dict))
-def load_from_json(fname: Path) -> Calibration:
+def load_from_json(fname: Path, is_default: bool) -> Calibration:
"""
Load a calibration from a JSON file.
@@ -131,6 +132,8 @@ def load_from_json(fname: Path) -> Calibration:
``clock_rate_lookup_by_sample_rate``
:param fname: The ``Path`` to the JSON calibration file.
+ :param is_default: If True, the loaded calibration file
+ is treated as the default calibration file.
:raises Exception: If the provided file does not include
the required keys.
:return: The ``Calibration`` object generated from the file.
@@ -157,6 +160,8 @@ def load_from_json(fname: Path) -> Calibration:
calibration["calibration_parameters"],
calibration["calibration_data"],
calibration["clock_rate_lookup_by_sample_rate"],
+ is_default,
+ fname,
)
diff --git a/scos_actions/calibration/tests/test_calibration.py b/scos_actions/calibration/tests/test_calibration.py
index 75fc1eb7..9e503acd 100644
--- a/scos_actions/calibration/tests/test_calibration.py
+++ b/scos_actions/calibration/tests/test_calibration.py
@@ -9,7 +9,6 @@
import pytest
-from scos_actions.calibration import sensor_calibration, sigan_calibration
from scos_actions.calibration.calibration import (
Calibration,
filter_by_parameter,
@@ -181,7 +180,7 @@ def setup_calibration_file(self, tmpdir):
json.dump(cal_data, file, indent=4)
# Load the data back in
- self.sample_cal = load_from_json(self.calibration_file)
+ self.sample_cal = load_from_json(self.calibration_file, False)
# Create a list of previous points to ensure that we don't repeat
self.pytest_points = []
@@ -235,6 +234,8 @@ def test_get_calibration_dict_exact_match_lookup(self):
calibration_params,
calibration_data,
clock_rate_lookup_by_sample_rate,
+ False,
+ Path(""),
)
cal_data = cal.get_calibration_dict([100.0, 200.0])
assert cal_data["NF"] == "NF at 100, 200"
@@ -247,11 +248,14 @@ def test_get_calibration_dict_within_range(self):
200.0: {100.0: {"NF": "NF at 200, 100"}},
}
clock_rate_lookup_by_sample_rate = {}
+ test_cal_path = Path("test_calibration.json")
cal = Calibration(
calibration_datetime,
calibration_params,
calibration_data,
clock_rate_lookup_by_sample_rate,
+ False,
+ test_cal_path,
)
with pytest.raises(CalibrationException) as e_info:
cal_data = cal.get_calibration_dict([100.0, 250.0])
@@ -286,17 +290,19 @@ def test_update(self):
calibration_params = ["sample_rate", "frequency"]
calibration_data = {100.0: {200.0: {"noise_figure": 0, "gain": 0}}}
clock_rate_lookup_by_sample_rate = {}
+ test_cal_path = Path("test_calibration.json")
cal = Calibration(
calibration_datetime,
calibration_params,
calibration_data,
clock_rate_lookup_by_sample_rate,
+ False,
+ test_cal_path,
)
action_params = {"sample_rate": 100.0, "frequency": 200.0}
update_time = get_datetime_str_now()
- test_cal_path = Path("test_calibration.json")
- cal.update(action_params, update_time, 30.0, 5.0, 21, test_cal_path)
- cal_from_file = load_from_json(test_cal_path)
+ cal.update(action_params, update_time, 30.0, 5.0, 21)
+ cal_from_file = load_from_json(test_cal_path, False)
test_cal_path.unlink()
file_utc_time = parse_datetime_iso_format_str(cal.last_calibration_datetime)
cal_time_utc = parse_datetime_iso_format_str(update_time)
diff --git a/scos_actions/capabilities/__init__.py b/scos_actions/capabilities/__init__.py
deleted file mode 100644
index d17bd457..00000000
--- a/scos_actions/capabilities/__init__.py
+++ /dev/null
@@ -1,46 +0,0 @@
-import hashlib
-import json
-import logging
-
-from scos_actions import utils
-from scos_actions.metadata.utils import construct_geojson_point
-from scos_actions.settings import FQDN, SENSOR_DEFINITION_FILE
-
-logger = logging.getLogger(__name__)
-capabilities = {}
-SENSOR_DEFINITION_HASH = None
-SENSOR_LOCATION = None
-
-logger.debug(f"Loading {SENSOR_DEFINITION_FILE}")
-try:
- capabilities["sensor"] = utils.load_from_json(SENSOR_DEFINITION_FILE)
-except Exception as e:
- logger.warning(
- f"Failed to load sensor definition file: {SENSOR_DEFINITION_FILE}"
- + "\nAn empty sensor definition will be used"
- )
- capabilities["sensor"] = {"sensor_spec": {"id": "unknown"}}
- capabilities["sensor"]["sensor_sha512"] = "UNKNOWN SENSOR DEFINITION"
-
-# Extract location from sensor definition file, if present
-if "location" in capabilities["sensor"]:
- try:
- sensor_loc = capabilities["sensor"].pop("location")
- SENSOR_LOCATION = construct_geojson_point(
- sensor_loc["x"],
- sensor_loc["y"],
- sensor_loc["z"] if "z" in sensor_loc else None,
- )
- except:
- logger.exception("Failed to get sensor location from sensor definition.")
-
-# Generate sensor definition file hash (SHA 512)
-try:
- if "sensor_sha512" not in capabilities["sensor"]:
- sensor_def = json.dumps(capabilities["sensor"], sort_keys=True)
- SENSOR_DEFINITION_HASH = hashlib.sha512(sensor_def.encode("UTF-8")).hexdigest()
- capabilities["sensor"]["sensor_sha512"] = SENSOR_DEFINITION_HASH
-except:
- capabilities["sensor"]["sensor_sha512"] = "ERROR GENERATING HASH"
- # SENSOR_DEFINITION_HASH is None, do not raise Exception
- logger.exception(f"Unable to generate sensor definition hash")
diff --git a/scos_actions/discover/__init__.py b/scos_actions/discover/__init__.py
index f77df84d..84ba8e6e 100644
--- a/scos_actions/discover/__init__.py
+++ b/scos_actions/discover/__init__.py
@@ -3,22 +3,13 @@
from scos_actions.actions.monitor_sigan import MonitorSignalAnalyzer
from scos_actions.actions.sync_gps import SyncGps
from scos_actions.discover.yaml import load_from_yaml
-from scos_actions.hardware.mocks.mock_gps import MockGPS
-from scos_actions.hardware.mocks.mock_sigan import MockSignalAnalyzer
-from scos_actions.settings import ACTION_DEFINITIONS_DIR, MOCK_SIGAN
-from scos_actions.signals import register_component_with_status
+from scos_actions.settings import ACTION_DEFINITIONS_DIR
-mock_sigan = MockSignalAnalyzer(randomize_values=True)
-mock_gps = MockGPS()
-if MOCK_SIGAN:
- register_component_with_status.send(mock_sigan.__class__, component=mock_sigan)
actions = {"logger": Logger()}
test_actions = {
- "test_sync_gps": SyncGps(
- parameters={"name": "test_sync_gps"}, sigan=mock_sigan, gps=mock_gps
- ),
+ "test_sync_gps": SyncGps(parameters={"name": "test_sync_gps"}),
"test_monitor_sigan": MonitorSignalAnalyzer(
- parameters={"name": "test_monitor_sigan"}, sigan=mock_sigan
+ parameters={"name": "test_monitor_sigan"}
),
"logger": Logger(),
}
@@ -26,15 +17,11 @@
def init(
action_classes=action_classes,
- sigan=mock_sigan,
- gps=mock_gps,
yaml_dir=ACTION_DEFINITIONS_DIR,
):
yaml_actions = {}
yaml_test_actions = {}
- for key, value in load_from_yaml(
- action_classes, sigan=sigan, gps=gps, yaml_dir=yaml_dir
- ).items():
+ for key, value in load_from_yaml(action_classes, yaml_dir=yaml_dir).items():
if key.startswith("test_"):
yaml_test_actions[key] = value
else:
diff --git a/scos_actions/discover/tests/test_yaml.py b/scos_actions/discover/tests/test_yaml.py
index 479892aa..473b39dd 100644
--- a/scos_actions/discover/tests/test_yaml.py
+++ b/scos_actions/discover/tests/test_yaml.py
@@ -30,7 +30,7 @@
def test_load_from_yaml_existing():
"""Any existing action definitions should be valid yaml."""
- load_from_yaml(actions.action_classes, sigan, gps)
+ load_from_yaml(actions.action_classes)
def _test_load_from_yaml_check_error(yaml_to_write, expected_error):
@@ -44,7 +44,7 @@ def _test_load_from_yaml_check_error(yaml_to_write, expected_error):
tmpfile.seek(0)
# Now try to load the invalid yaml file, expecting an error
with pytest.raises(expected_error):
- load_from_yaml(actions.action_classes, sigan, gps=gps, yaml_dir=tmpdir)
+ load_from_yaml(actions.action_classes, yaml_dir=tmpdir)
os.unlink(tmpfile.name)
diff --git a/scos_actions/discover/yaml.py b/scos_actions/discover/yaml.py
index bf76d9bd..c0ae8eed 100644
--- a/scos_actions/discover/yaml.py
+++ b/scos_actions/discover/yaml.py
@@ -8,7 +8,7 @@
logger = logging.getLogger(__name__)
-def load_from_yaml(action_classes, sigan, gps, yaml_dir: Path = ACTION_DEFINITIONS_DIR):
+def load_from_yaml(action_classes, yaml_dir: Path = ACTION_DEFINITIONS_DIR):
"""Load any YAML files in yaml_dir."""
parsed_actions = {}
yaml = YAML(typ="safe")
@@ -18,9 +18,7 @@ def load_from_yaml(action_classes, sigan, gps, yaml_dir: Path = ACTION_DEFINITIO
for class_name, parameters in definition.items():
try:
logger.debug(f"Attempting to configure: {class_name}")
- action = action_classes[class_name](
- parameters=parameters, sigan=sigan, gps=gps
- )
+ action = action_classes[class_name](parameters=parameters)
parsed_actions[action.name] = action
except KeyError as exc:
err = "Nonexistent action class name {!r} referenced in {!r}"
diff --git a/scos_actions/hardware/__init__.py b/scos_actions/hardware/__init__.py
index 34af394d..e69de29b 100644
--- a/scos_actions/hardware/__init__.py
+++ b/scos_actions/hardware/__init__.py
@@ -1,75 +0,0 @@
-import importlib
-import logging
-from pathlib import Path
-
-from its_preselector.configuration_exception import ConfigurationException
-from its_preselector.controlbyweb_web_relay import ControlByWebWebRelay
-
-from scos_actions import utils
-from scos_actions.capabilities import capabilities
-from scos_actions.settings import (
- PRESELECTOR_CLASS,
- PRESELECTOR_CONFIG_FILE,
- PRESELECTOR_MODULE,
- SWITCH_CONFIGS_DIR,
-)
-from scos_actions.signals import register_component_with_status
-from scos_actions.status.status_registration_handler import status_registration_handler
-
-logger = logging.getLogger(__name__)
-
-
-def load_switches(switch_dir: Path) -> dict:
- switch_dict = {}
- if switch_dir is not None and switch_dir.is_dir():
- for f in switch_dir.iterdir():
- file_path = f.resolve()
- logger.debug(f"loading switch config {file_path}")
- conf = utils.load_from_json(file_path)
- try:
- switch = ControlByWebWebRelay(conf)
- logger.debug(f"Adding {switch.id}")
-
- switch_dict[switch.id] = switch
- logger.debug(f"Registering switch status for {switch.name}")
- register_component_with_status.send(__name__, component=switch)
- except ConfigurationException:
- logger.error(f"Unable to configure switch defined in: {file_path}")
-
- return switch_dict
-
-
-def load_preselector_from_file(preselector_config_file: Path):
- if preselector_config_file is None:
- return None
- else:
- try:
- preselector_config = utils.load_from_json(preselector_config_file)
- return load_preselector(
- preselector_config, PRESELECTOR_MODULE, PRESELECTOR_CLASS
- )
- except ConfigurationException:
- logger.exception(
- f"Unable to create preselector defined in: {preselector_config_file}"
- )
- return None
-
-
-def load_preselector(preselector_config, module, preselector_class_name):
- if module is not None and preselector_class_name is not None:
- preselector_module = importlib.import_module(module)
- preselector_constructor = getattr(preselector_module, preselector_class_name)
- ps = preselector_constructor(capabilities["sensor"], preselector_config)
- if ps and ps.name:
- logger.debug(f"Registering {ps.name} as status provider")
- register_component_with_status.send(__name__, component=ps)
- else:
- ps = None
- return ps
-
-
-register_component_with_status.connect(status_registration_handler)
-logger.debug("Connected status registration handler")
-preselector = load_preselector_from_file(PRESELECTOR_CONFIG_FILE)
-switches = load_switches(SWITCH_CONFIGS_DIR)
-logger.debug(f"Loaded {(len(switches))} switches.")
diff --git a/scos_actions/hardware/mocks/mock_sigan.py b/scos_actions/hardware/mocks/mock_sigan.py
index 28379ab6..49687574 100644
--- a/scos_actions/hardware/mocks/mock_sigan.py
+++ b/scos_actions/hardware/mocks/mock_sigan.py
@@ -1,10 +1,12 @@
"""Mock a signal analyzer for testing."""
import logging
from collections import namedtuple
+from typing import Optional
import numpy as np
from scos_actions import __version__ as SCOS_ACTIONS_VERSION
+from scos_actions.calibration.calibration import Calibration
from scos_actions.hardware.sigan_iface import SignalAnalyzerInterface
from scos_actions.utils import get_datetime_str_now
@@ -24,8 +26,14 @@ class MockSignalAnalyzer(SignalAnalyzerInterface):
gain: requested gain in dB
"""
- def __init__(self, randomize_values=False):
- super().__init__()
+ def __init__(
+ self,
+ sensor_cal: Optional[Calibration] = None,
+ sigan_cal: Optional[Calibration] = None,
+ switches: Optional[dict] = None,
+ randomize_values: bool = False,
+ ):
+ super().__init__(sensor_cal, sigan_cal, switches)
# Define the default calibration dicts
self.DEFAULT_SIGAN_CALIBRATION = {
"datetime": get_datetime_str_now(),
@@ -204,7 +212,7 @@ def update_calibration(self, params):
def recompute_sensor_calibration_data(self, cal_args: list) -> None:
if self.sensor_calibration is not None:
self.sensor_calibration_data.update(
- self.sensor_calibration.get_calibration_dict(cal_args)
+ self._sensor_calibration.get_calibration_dict(cal_args)
)
else:
logger.warning("Sensor calibration does not exist.")
diff --git a/scos_actions/hardware/sensor.py b/scos_actions/hardware/sensor.py
new file mode 100644
index 00000000..b9e14806
--- /dev/null
+++ b/scos_actions/hardware/sensor.py
@@ -0,0 +1,177 @@
+import datetime
+import hashlib
+import json
+import logging
+from typing import Dict, Optional
+
+from its_preselector.preselector import Preselector
+from its_preselector.web_relay import WebRelay
+
+from .gps_iface import GPSInterface
+from .sigan_iface import SignalAnalyzerInterface
+
+
+class Sensor:
+ """
+ Software representation of the physical RF sensor. The Sensor may include a GPSInterface,
+ Preselector, a dictionary of WebRelays, a location specified in GeoJSON, and a dictionary
+ of the sensor capabilities. The capabilities should include a 'sensor' key that maps to
+ the metadata definition of the Sensor(
+ https://github.com/NTIA/sigmf-ns-ntia/blob/master/ntia-sensor.sigmf-ext.md#01-the-sensor-object),
+ and an 'action' key that maps to a list of ntia-scos action objects
+ (https://github.com/NTIA/sigmf-ns-ntia/blob/master/ntia-scos.sigmf-ext.md#02-the-action-object)
+ The Sensor instance is passed into Actions __call__ methods to perform an action.
+ """
+
+ logger = logging.getLogger(__name__)
+
+ def __init__(
+ self,
+ signal_analyzer: SignalAnalyzerInterface,
+ capabilities: dict,
+ gps: Optional[GPSInterface] = None,
+ preselector: Optional[Preselector] = None,
+ switches: Optional[Dict[str, WebRelay]] = {},
+ location: Optional[dict] = None,
+ ):
+ self.signal_analyzer = signal_analyzer
+ self.gps = gps
+ self.preselector = preselector
+ self.switches = switches
+ self.location = location
+ self.capabilities = capabilities
+ # There is no setter for start_time property
+ self._start_time = datetime.datetime.utcnow()
+
+ @property
+ def signal_analyzer(self) -> SignalAnalyzerInterface:
+ return self._signal_analyzer
+
+ @signal_analyzer.setter
+ def signal_analyzer(self, sigan: SignalAnalyzerInterface):
+ self._signal_analyzer = sigan
+
+ @property
+ def gps(self) -> GPSInterface:
+ """
+ The sensor's Global Positioning System.
+ """
+ return self._gps
+
+ @gps.setter
+ def gps(self, gps: GPSInterface):
+ """
+ Set the sensor's Global Positioning System.
+ """
+ self._gps = gps
+
+ @property
+ def preselector(self) -> Preselector:
+ """
+ RF front end that may include calibration sources, filters, and/or amplifiers.
+ """
+ return self._preselector
+
+ @preselector.setter
+ def preselector(self, preselector: Preselector):
+ """
+ Set the RF front end that may include calibration sources, filters, and/or amplifiers.
+ """
+ self._preselector = preselector
+
+ @property
+ def switches(self) -> Dict[str, WebRelay]:
+ """
+ Dictionary of WebRelays, indexed by name. WebRelays may enable/disable other
+ components within the sensor and/or provide a variety of sensors.
+ """
+ return self._switches
+
+ @switches.setter
+ def switches(self, switches: Dict[str, WebRelay]):
+ self._switches = switches
+
+ @property
+ def location(self) -> dict:
+ """
+ The GeoJSON dictionary of the sensor's location.
+ """
+ return self._location
+
+ @location.setter
+ def location(self, loc: dict):
+ """
+ Set the GeoJSON location of the sensor.
+ """
+ self._location = loc
+
+ @property
+ def capabilities(self) -> dict:
+ """
+ A dictionary of the sensor's capabilities. The dictionary should
+ include a 'sensor' key that maps to the ntia-sensor
+ (https://github.com/NTIA/sigmf-ns-ntia/blob/master/ntia-sensor.sigmf-ext.md)
+ object and an actions key that maps to a list of ntia-scos action objects
+ (https://github.com/NTIA/sigmf-ns-ntia/blob/master/ntia-scos.sigmf-ext.md#02-the-action-object)
+ """
+ return self._capabilities
+
+ @capabilities.setter
+ def capabilities(self, capabilities: dict):
+ """
+ Set the dictionary of the sensor's capabilities. The dictionary should
+ include a 'sensor' key that links to the ntia-sensor
+ (https://github.com/NTIA/sigmf-ns-ntia/blob/master/ntia-sensor.sigmf-ext.md)
+ object and an actions key that links to a list of ntia-scos action objects
+ (https://github.com/NTIA/sigmf-ns-ntia/blob/master/ntia-scos.sigmf-ext.md#02-the-action-object)
+ """
+ if capabilities is not None:
+ if (
+ "sensor" in capabilities
+ and "sensor_sha512" not in capabilities["sensor"]
+ ):
+ sensor_def = json.dumps(capabilities["sensor"], sort_keys=True)
+ sensor_definition_hash = hashlib.sha512(
+ sensor_def.encode("UTF-8")
+ ).hexdigest()
+ capabilities["sensor"]["sensor_sha512"] = sensor_definition_hash
+ self._capabilities = capabilities
+
+ @property
+ def has_configurable_preselector(self) -> bool:
+ """
+ Checks if the preselector has multiple rf paths.
+ Returns: True if either the Preselector object or the sensor definition contain multiple rf_paths, False
+ otherwise.
+ """
+ if (
+ self.preselector is not None
+ and self.preselector.rf_paths is not None
+ and len(self.preselector.rf_paths) > 0
+ ):
+ self.logger.debug(
+ "Preselector is configurable: found multiple rf_paths in preselector object."
+ )
+ return True
+ elif (
+ self.capabilities
+ and len(
+ self.capabilities.get("sensor", {})
+ .get("preselector", {})
+ .get("rf_paths", [])
+ )
+ > 1
+ ):
+ self.logger.debug(
+ "Preselector is configurable: found multiple rf_paths in sensor definition."
+ )
+ return True
+ else:
+ self.logger.debug(
+ "Preselector is not configurable: Neither sensor definition or preselector object contained multiple rf_paths."
+ )
+ return False
+
+ @property
+ def start_time(self):
+ return self._start_time
diff --git a/scos_actions/hardware/sigan_iface.py b/scos_actions/hardware/sigan_iface.py
index 786025d3..7ed01d45 100644
--- a/scos_actions/hardware/sigan_iface.py
+++ b/scos_actions/hardware/sigan_iface.py
@@ -1,10 +1,10 @@
-import copy
import logging
import time
from abc import ABC, abstractmethod
+from typing import Dict, Optional
-from scos_actions.calibration import sensor_calibration, sigan_calibration
-from scos_actions.capabilities import capabilities
+from its_preselector.web_relay import WebRelay
+from scos_actions.calibration.calibration import Calibration
from scos_actions.hardware.utils import power_cycle_sigan
from scos_actions.utils import convert_string_to_millisecond_iso_format
@@ -23,17 +23,24 @@
class SignalAnalyzerInterface(ABC):
- def __init__(self):
+ def __init__(
+ self,
+ sensor_cal: Optional[Calibration] = None,
+ sigan_cal: Optional[Calibration] = None,
+ switches: Optional[Dict[str, WebRelay]] = None,
+ ):
self.sensor_calibration_data = {}
self.sigan_calibration_data = {}
- self.sensor_calibration = sensor_calibration
- self.sigan_calibration = sigan_calibration
+ self._sensor_calibration = sensor_cal
+ self._sigan_calibration = sigan_cal
+ self._model = "Unknown"
+ self.switches = switches
@property
def last_calibration_time(self) -> str:
"""Returns the last calibration time from calibration data."""
return convert_string_to_millisecond_iso_format(
- sensor_calibration.last_calibration_datetime
+ self.sensor_calibration.last_calibration_datetime
)
@property
@@ -84,7 +91,7 @@ def connect(self) -> None:
"""
pass
- def healthy(self, num_samples=56000):
+ def healthy(self, num_samples: int = 56000) -> bool:
"""Perform health check by collecting IQ samples."""
logger.debug("Performing health check.")
if not self.is_available:
@@ -112,7 +119,7 @@ def power_cycle_and_connect(self, sleep_time: float = 2.0) -> None:
"""
logger.info("Attempting to power cycle the signal analyzer and reconnect.")
try:
- power_cycle_sigan()
+ power_cycle_sigan(self.switches)
except Exception as hce:
logger.warning(f"Unable to power cycle sigan: {hce}")
return
@@ -130,9 +137,9 @@ def power_cycle_and_connect(self, sleep_time: float = 2.0) -> None:
def recompute_sensor_calibration_data(self, cal_args: list) -> None:
self.sensor_calibration_data = {}
- if sensor_calibration is not None:
+ if self.sensor_calibration is not None:
self.sensor_calibration_data.update(
- sensor_calibration.get_calibration_dict(cal_args)
+ self.sensor_calibration.get_calibration_dict(cal_args)
)
else:
logger.warning("Sensor calibration does not exist.")
@@ -140,20 +147,36 @@ def recompute_sensor_calibration_data(self, cal_args: list) -> None:
def recompute_sigan_calibration_data(self, cal_args: list) -> None:
self.sigan_calibration_data = {}
"""Set the sigan calibration data based on the current tuning"""
- if sigan_calibration is not None:
+ if self.sigan_calibration is not None:
self.sigan_calibration_data.update(
- sigan_calibration.get_calibration_dict(cal_args)
+ self.sigan_calibration.get_calibration_dict(cal_args)
)
else:
logger.warning("Sigan calibration does not exist.")
- def get_status(self):
- try:
- sigan_model = capabilities["sensor"]["signal_analyzer"]["sigan_spec"][
- "model"
- ]
- if sigan_model.lower() in ["default", ""]:
- raise KeyError
- except KeyError:
- sigan_model = str(self.__class__)
- return {"model": sigan_model, "healthy": self.healthy()}
+ def get_status(self) -> dict:
+ return {"model": self._model, "healthy": self.healthy()}
+
+ @property
+ def model(self) -> str:
+ return self._model
+
+ @model.setter
+ def model(self, value: str):
+ self._model = value
+
+ @property
+ def sensor_calibration(self) -> Calibration:
+ return self._sensor_calibration
+
+ @sensor_calibration.setter
+ def sensor_calibration(self, cal: Calibration):
+ self._sensor_calibration = cal
+
+ @property
+ def sigan_calibration(self) -> Calibration:
+ return self._sigan_calibration
+
+ @sigan_calibration.setter
+ def sigan_calibration(self, cal: Calibration):
+ self._sigan_calibration = cal
diff --git a/scos_actions/hardware/tests/test_preselector.py b/scos_actions/hardware/tests/test_preselector.py
deleted file mode 100644
index 83f02906..00000000
--- a/scos_actions/hardware/tests/test_preselector.py
+++ /dev/null
@@ -1,12 +0,0 @@
-from environs import Env
-
-from scos_actions.hardware import load_preselector
-
-
-def test_load_preselector():
- preselector = load_preselector(
- {"name": "test", "base_url": "http://127.0.0.1"},
- "its_preselector.web_relay_preselector",
- "WebRelayPreselector",
- )
- assert preselector is not None
diff --git a/scos_actions/hardware/tests/test_sensor.py b/scos_actions/hardware/tests/test_sensor.py
new file mode 100644
index 00000000..216d626a
--- /dev/null
+++ b/scos_actions/hardware/tests/test_sensor.py
@@ -0,0 +1,74 @@
+from its_preselector.controlbyweb_web_relay import ControlByWebWebRelay
+from its_preselector.web_relay_preselector import WebRelayPreselector
+
+from scos_actions.hardware.mocks.mock_gps import MockGPS
+from scos_actions.hardware.mocks.mock_sigan import MockSignalAnalyzer
+from scos_actions.hardware.sensor import Sensor
+
+
+def test_sensor():
+ sensor = Sensor(
+ signal_analyzer=MockSignalAnalyzer(), capabilities={}, gps=MockGPS()
+ )
+ assert sensor is not None
+ assert sensor.signal_analyzer is not None
+ assert sensor.gps is not None
+
+
+def test_set_get_preselector():
+ preselector = WebRelayPreselector({}, {"name": "preselector", "base_url": "url"})
+ sensor = Sensor(signal_analyzer=MockSignalAnalyzer(), capabilities={})
+ sensor.preselector = preselector
+ assert sensor.preselector == preselector
+
+
+def test_set_get_gps():
+ gps = MockGPS()
+ sensor = Sensor(signal_analyzer=MockSignalAnalyzer(), capabilities={})
+ sensor.gps = gps
+ assert sensor.gps == gps
+
+
+def test_set_get_switches():
+ switches = {"spu": ControlByWebWebRelay({"name": "spu", "base_url": "url"})}
+ sensor = Sensor(signal_analyzer=MockSignalAnalyzer(), capabilities={})
+ sensor.switches = switches
+ assert sensor.switches == switches
+
+
+def test_has_configurable_preselector_in_capabilities():
+ capabilities = {
+ "sensor": {
+ "preselector": {"rf_paths": [{"name": "antenna"}, {"name": "noise_diode"}]}
+ }
+ }
+ sensor = Sensor(signal_analyzer=MockSignalAnalyzer(), capabilities=capabilities)
+ assert sensor.has_configurable_preselector == True
+
+
+def test_has_configurable_preselector_in_preselector():
+ sensor = Sensor(signal_analyzer=MockSignalAnalyzer(), capabilities={})
+ sensor.preselector = WebRelayPreselector(
+ {}, {"name": "preselector", "base_url": "url"}
+ )
+ sensor.preselector.rf_paths = [{"name": "antenna"}, {"name": "noise_diode"}]
+ assert sensor.has_configurable_preselector == True
+
+
+def test_has_configurable_preselector_not_configurable():
+ capabilities = {"sensor": {"preselector": {"rf_paths": [{"name": "antenna"}]}}}
+ sensor = Sensor(signal_analyzer=MockSignalAnalyzer(), capabilities=capabilities)
+ assert sensor.has_configurable_preselector == False
+
+
+def test_hash_set_when_not_present():
+ capabilities = {"sensor": {"preselector": {"rf_paths": [{"name": "antenna"}]}}}
+ sensor = Sensor(signal_analyzer=MockSignalAnalyzer(), capabilities=capabilities)
+ assert "sensor_sha512" in sensor.capabilities["sensor"]
+ assert sensor.capabilities["sensor"]["sensor_sha512"] is not None
+
+
+def test_hash_not_overwritten():
+ capabilities = {"sensor": {"sensor_sha512": "some hash"}}
+ sensor = Sensor(signal_analyzer=MockSignalAnalyzer(), capabilities=capabilities)
+ assert sensor.capabilities["sensor"]["sensor_sha512"] == "some hash"
diff --git a/scos_actions/hardware/tests/test_sigan.py b/scos_actions/hardware/tests/test_sigan.py
index df980ecf..c82ffeec 100644
--- a/scos_actions/hardware/tests/test_sigan.py
+++ b/scos_actions/hardware/tests/test_sigan.py
@@ -1,5 +1,3 @@
-from environs import Env
-
from scos_actions.hardware.mocks.mock_sigan import MockSignalAnalyzer
diff --git a/scos_actions/hardware/utils.py b/scos_actions/hardware/utils.py
index 3f0ef0eb..49642da7 100644
--- a/scos_actions/hardware/utils.py
+++ b/scos_actions/hardware/utils.py
@@ -1,9 +1,10 @@
import logging
import subprocess
+from typing import Dict
import psutil
+from its_preselector.web_relay import WebRelay
-from scos_actions.hardware import switches
from scos_actions.hardware.hardware_configuration_exception import (
HardwareConfigurationException,
)
@@ -126,7 +127,7 @@ def get_max_cpu_temperature(fahrenheit: bool = False) -> float:
raise e
-def power_cycle_sigan():
+def power_cycle_sigan(switches: Dict[str, WebRelay]):
"""
Performs a hard power cycle of the signal analyzer. This method requires power to the signal analyzer is
controlled by a Web_Relay (see https://www.github.com/ntia/Preselector) and that the switch id of that
diff --git a/scos_actions/settings.py b/scos_actions/settings.py
index 00cf702a..fa63aaa1 100644
--- a/scos_actions/settings.py
+++ b/scos_actions/settings.py
@@ -2,7 +2,6 @@
from os import path
from pathlib import Path
-from django.conf import settings
from environs import Env
logger = logging.getLogger(__name__)
@@ -10,64 +9,29 @@
logger.debug("Initializing scos-actions settings")
CONFIG_DIR = Path(__file__).parent.resolve() / "configs"
+logger.debug(f"scos-actions: CONFIG_DIR:{CONFIG_DIR}")
ACTION_DEFINITIONS_DIR = CONFIG_DIR / "actions"
-
-if not settings.configured or not hasattr(settings, "DEFAULT_CALIBRATION_FILE"):
- DEFAULT_CALIBRATION_FILE = path.join(CONFIG_DIR, "default_calibration.json")
-else:
- DEFAULT_CALIBRATION_FILE = settings.DEFAULT_CALIBRATION_FILE
-
-# set sigan_calibration file and sensor_calibration_file
-if not settings.configured or not hasattr(settings, "SIGAN_CALIBRATION_FILE"):
- logger.warning("Sigan calibration file is not defined.")
- SIGAN_CALIBRATION_FILE = ""
- sigan_calibration = None
-else:
- SIGAN_CALIBRATION_FILE = settings.SIGAN_CALIBRATION_FILE
- logger.debug(f"SCOS_ACTIONS: SIGAN_CALIBRATION_FILE: {SIGAN_CALIBRATION_FILE}")
-
-if not settings.configured or not hasattr(settings, "SENSOR_CALIBRATION_FILE"):
- logger.warning("Sensor calibration file is not defined.")
- SENSOR_CALIBRATION_FILE = ""
- sensor_calibration = None
-else:
- SENSOR_CALIBRATION_FILE = settings.SENSOR_CALIBRATION_FILE
- logger.debug(f"SCOS_ACTIONS: SENSOR_CALIBRATION_FILE: {SENSOR_CALIBRATION_FILE}")
-
+logger.debug(f"scos-actions: ACTION_DEFINITIONS_DIR:{ACTION_DEFINITIONS_DIR}")
SWITCH_CONFIGS_DIR = env("SWITCH_CONFIGS_DIR", default=None)
-if not settings.configured:
- PRESELECTOR_CONFIG_FILE = None
- SENSOR_DEFINITION_FILE = None
- FQDN = None
- PRESELECTOR_MODULE = env("PRESELECTOR_MODULE", default=None)
- PRESELECTOR_CLASS = env("PRESELECTOR_CLASS", default=None)
- SIGAN_POWER_CYCLE_STATES = env("SIGAN_POWER_CYCLE_STATES", default=None)
- SIGAN_POWER_SWITCH = env("SIGAN_POWER_SWITCH", default=None)
- MOCK_SIGAN = env("MOCK_SIGAN", default=None)
- SCOS_SENSOR_GIT_TAG = env("SCOS_SENSOR_GIT_TAG", default="unknown")
-else:
- MOCK_SIGAN = settings.MOCK_SIGAN
- RUNNING_TESTS = settings.RUNNING_TESTS
- SENSOR_DEFINITION_FILE = Path(settings.SENSOR_DEFINITION_FILE)
- FQDN = settings.FQDN
- SCOS_SENSOR_GIT_TAG = settings.SCOS_SENSOR_GIT_TAG
- if settings.PRESELECTOR_CONFIG:
- PRESELECTOR_CONFIG_FILE = settings.PRESELECTOR_CONFIG
- else:
- PRESELECTOR_CONFIG_FILE = None
-
- if settings.PRESELECTOR_MODULE and settings.PRESELECTOR_CLASS:
- PRESELECTOR_MODULE = settings.PRESELECTOR_MODULE
- PRESELECTOR_CLASS = settings.PRESELECTOR_CLASS
- else:
- PRESELECTOR_MODULE = "its_preselector.web_relay_preselector"
- PRESELECTOR_CLASS = "WebRelayPreselector"
- if hasattr(settings, "SWITCH_CONFIGS_DIR"):
- SWITCH_CONFIGS_DIR = Path(settings.SWITCH_CONFIGS_DIR)
-
- SIGAN_POWER_SWITCH = None
- SIGAN_POWER_CYCLE_STATES = None
- if hasattr(settings, "SIGAN_POWER_SWITCH"):
- SIGAN_POWER_SWITCH = settings.SIGAN_POWER_SWITCH
- if hasattr(settings, "SIGAN_POWER_CYCLE_STATES"):
- SIGAN_POWER_CYCLE_STATES = settings.SIGAN_POWER_CYCLE_STATES
+if SWITCH_CONFIGS_DIR:
+ SWITCH_CONFIGS_DIR = Path(SWITCH_CONFIGS_DIR)
+logger.debug(f"scos-actions: SWITCH_CONFIGS_DIR:{SWITCH_CONFIGS_DIR}")
+SCOS_SENSOR_GIT_TAG = env("SCOS_SENSOR_GIT_TAG", default="unknown")
+logger.debug(f"scos-actions: SCOS_SENSOR_GIT_TAG:{SCOS_SENSOR_GIT_TAG}")
+MOCK_SIGAN = env.bool("MOCK_SIGAN", True)
+logger.debug(f"scos-actions: MOCK_SIGAN:{MOCK_SIGAN}")
+MOCK_SIGAN_RANDOM = env.bool("MOCK_SIGAN_RANDOM", default=False)
+logger.debug(f"scos-actions: MOCK_SIGAN_RANDOM:{MOCK_SIGAN_RANDOM}")
+RUNNING_TESTS = env.bool("RUNNING_TESTS", False)
+logger.debug(f"scos-actions: RUNNING_TESTS:{RUNNING_TESTS}")
+logger.debug(f"scos-actions: RUNNING_TESTS:{RUNNING_TESTS}")
+FQDN = env("FQDN", None)
+logger.debug(f"scos-actions: FQDN:{FQDN}")
+SIGAN_POWER_SWITCH = env("SIGAN_POWER_SWITCH", default=None)
+logger.debug(f"scos-actions: SIGAN_POWER_SWITCH:{SIGAN_POWER_SWITCH}")
+SIGAN_POWER_CYCLE_STATES = env("SIGAN_POWER_CYCLE_STATES", default=None)
+logger.debug(f"scos-actions: SIGAN_POWER_CYCLE_STATES:{SIGAN_POWER_CYCLE_STATES}")
+PRESELECTOR_MODULE = env("PRESELECTOR_MODULE", default=None)
+logger.debug(f"scos-actions: PRESELECTOR_MODULE:{PRESELECTOR_MODULE}")
+PRESELECTOR_CLASS = env("PRESELECTOR_CLASS", default=None)
+logger.debug(f"scos-actions: PRESELECTOR_CLASS:{PRESELECTOR_CLASS}")
diff --git a/scos_actions/signal_processing/calibration.py b/scos_actions/signal_processing/calibration.py
index aafa70c5..41ef5d71 100644
--- a/scos_actions/signal_processing/calibration.py
+++ b/scos_actions/signal_processing/calibration.py
@@ -1,10 +1,9 @@
import logging
-from typing import Tuple
+from typing import Optional, Tuple
import numpy as np
+from its_preselector.preselector import Preselector
from scipy.constants import Boltzmann
-
-from scos_actions.hardware import preselector
from scos_actions.signal_processing.unit_conversion import (
convert_celsius_to_fahrenheit,
convert_celsius_to_kelvins,
@@ -65,7 +64,9 @@ def y_factor(
return noise_figure_dB, gain_dB
-def get_linear_enr(cal_source_idx: int = None) -> float:
+def get_linear_enr(
+ preselector: Preselector, cal_source_idx: Optional[int] = None
+) -> float:
"""
Get the excess noise ratio of a calibration source.
@@ -75,6 +76,7 @@ def get_linear_enr(cal_source_idx: int = None) -> float:
The preselector is loaded from `scos_actions.hardware`.
+ :param preselector: The sensor preselector
:param cal_source_idx: The index of the specified
calibration source in `preselector.cal_sources`.
:return: The excess noise ratio of the specified
@@ -106,15 +108,16 @@ def get_linear_enr(cal_source_idx: int = None) -> float:
return enr_linear
-def get_temperature(sensor_idx: int = None) -> Tuple[float, float, float]:
+def get_temperature(
+ preselector: Preselector, sensor_idx: Optional[int] = None
+) -> Tuple[float, float, float]:
"""
Get the temperature from a preselector sensor.
The preselector is expected to be configured to return the
temperature in degrees Celsius.
- The preselector is loaded from `scos_actions.hardware`.
-
+ :param preselector: The sensor preselector.
:param sensor_idx: The index of the desired temperature
sensor in the preselector.
:raises CalibrationException: If no sensor index is provided, or
diff --git a/scos_actions/signals.py b/scos_actions/signals.py
index 36baaf3b..01a351f6 100644
--- a/scos_actions/signals.py
+++ b/scos_actions/signals.py
@@ -7,9 +7,3 @@
location_action_completed = Signal()
trigger_api_restart = Signal()
-
-# Provides argument: 'component'
-register_component_with_status = Signal()
-
-# Provides argument 'action'
-register_action = Signal()
diff --git a/scos_actions/status/__init__.py b/scos_actions/status/__init__.py
deleted file mode 100644
index f21e1efc..00000000
--- a/scos_actions/status/__init__.py
+++ /dev/null
@@ -1,7 +0,0 @@
-import datetime
-
-from .status_monitor import StatusMonitor
-
-status_registrar = StatusMonitor()
-
-start_time = datetime.datetime.utcnow()
diff --git a/scos_actions/status/status_monitor.py b/scos_actions/status/status_monitor.py
deleted file mode 100644
index dd76d00b..00000000
--- a/scos_actions/status/status_monitor.py
+++ /dev/null
@@ -1,12 +0,0 @@
-import logging
-
-logger = logging.getLogger(__name__)
-
-
-class StatusMonitor:
- def __init__(self):
- logger.debug("Initializing StatusMonitor")
- self.status_components = []
-
- def add_component(self, component):
- self.status_components.append(component)
diff --git a/scos_actions/status/status_registration_handler.py b/scos_actions/status/status_registration_handler.py
deleted file mode 100644
index aed88be3..00000000
--- a/scos_actions/status/status_registration_handler.py
+++ /dev/null
@@ -1,13 +0,0 @@
-import logging
-
-from . import status_registrar
-
-logger = logging.getLogger(__name__)
-
-
-def status_registration_handler(sender, **kwargs):
- try:
- logger.debug(f"Registering {sender} as status provider")
- status_registrar.add_component(kwargs["component"])
- except:
- logger.exception("Error registering status component")
diff --git a/scos_actions/utils.py b/scos_actions/utils.py
index 0fc5a73e..e70a4324 100644
--- a/scos_actions/utils.py
+++ b/scos_actions/utils.py
@@ -5,8 +5,6 @@
from dateutil import parser
-from scos_actions.status import start_time
-
logger = logging.getLogger(__name__)
@@ -121,7 +119,7 @@ def get_parameter(p: str, params: dict):
return params[p]
-def get_days_up():
+def get_days_up(start_time):
elapsed = datetime.utcnow() - start_time
days = elapsed.days
fractional_day = elapsed.seconds / (60 * 60 * 24)