Skip to content

Commit

Permalink
add docstrings and type hints
Browse files Browse the repository at this point in the history
  • Loading branch information
aromanielloNTIA committed May 3, 2023
1 parent 6b4dbef commit 9fdc5c3
Showing 1 changed file with 83 additions and 4 deletions.
87 changes: 83 additions & 4 deletions scos_actions/actions/acquire_sea_data_product.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,8 @@ def run(self, iq: ray.ObjectRef) -> np.ndarray:
"""
Compute power spectral densities from IQ samples.
:param iq: Complex-valued input waveform samples.
:param iq: Complex-valued input waveform samples (which should
already exist in the Ray object store).
:return: A 2D NumPy array of statistical detector results computed
from PSD amplitudes, ordered (max, mean).
"""
Expand All @@ -183,6 +184,15 @@ def run(self, iq: ray.ObjectRef) -> np.ndarray:

@ray.remote
class AmplitudeProbabilityDistribution:
"""
Compute a downsampled amplitude probability distribution from IQ samples.
The result is the probability axis of the downsampled APD. Downsampling is
configurable through defining a bin size, minimum bin value, and maximum bin
value for binning amplitudes. The amplitude data can be reconstructed if these
three parameters are known.
"""

def __init__(
self,
bin_size_dB: float,
Expand All @@ -203,7 +213,8 @@ def run(self, iq: ray.ObjectRef) -> np.ndarray:
"""
Generate the downsampled APD result from IQ samples.
:param iq: Complex-valued input waveform samples.
:param iq: Complex-valued input waveform samples (which should
already exist in the Ray object store).
:return: A NumPy array containing the APD probability axis as percentages.
"""
p, _ = get_apd(
Expand All @@ -218,6 +229,16 @@ def run(self, iq: ray.ObjectRef) -> np.ndarray:

@ray.remote
class PowerVsTime:
"""
Compute mean/max PVT results and summary statistics from IQ samples.
The results are two NumPy arrays. The first contains the full PVT detector
results (``detector`` results with ``bin_size_ms`` resolution). The second
contains two values: the maximum of the first detector result, and the median
of the second detector result. These are used to obtain max-of-max and median-of-
mean channel power summary statistics.
"""

def __init__(
self,
sample_rate_Hz: float,
Expand All @@ -229,7 +250,17 @@ def __init__(
self.detector = detector
self.impedance_ohms = impedance_ohms

def run(self, iq: ray.ObjectRef):
def run(self, iq: ray.ObjectRef) -> Tuple[np.ndarray, np.ndarray]:
"""
Compute power versus time results from IQ samples.
:param iq: Complex-valued input waveform samples (which should
already exist in the Ray object store).
:return: Two NumPy arrays: the first has shape (2, 400) and
the second is 1D with length 2. The first array contains
the (max, mean) detector results and the second array contains
the (max-of-max, median-of-mean) summary statistics.
"""
# Reshape IQ data into blocks and calculate power
n_blocks = len(iq) // self.block_size
iq_pwr = calculate_power_watts(
Expand All @@ -251,6 +282,15 @@ def run(self, iq: ray.ObjectRef):

@ray.remote
class PeriodicFramePower:
"""
Compute periodic frame power results from IQ samples.
The result is a 2D NumPy array, with the shape depending on the
parameters which initialize this worker. In current form, the result
is a (6, 560) NumPy array. Power samples are chunked into periodic frames,
and statistical detectors are applied twice.
"""

def __init__(
self,
sample_rate_Hz: float,
Expand Down Expand Up @@ -282,6 +322,15 @@ def __init__(
)

def run(self, iq: ray.ObjectRef) -> np.ndarray:
"""
Compute periodic frame power results from IQ samples.
:param iq: Complex-valued input waveform samples (which should
already exist in the Ray object store).
:return: A (6, 560) NumPy array containing the PFP results.
The order is (min-of-mean, max-of-mean, mean-of-mean,
min-of-max, max-of-max, mean-of-max).
"""
# Set up dimensions to make the statistics fast
chunked_shape = (
iq.shape[0] // self.n_frames,
Expand Down Expand Up @@ -309,6 +358,29 @@ def run(self, iq: ray.ObjectRef) -> np.ndarray:

@ray.remote
class IQProcessor:
"""
Supervisor actor for IQ processing.
Note: the current implementation in ``__call__`` makes the
assumption that all ``params`` used are identical for each
consecutive capture (i.e., each channel). The ``params`` which
fall under this assumption are: ``SAMPLE_RATE``, ``NUM_FFTS``,
``TD_BIN_SIZE_MS``, ``PFP_FRAME_PERIOD_MS``, ``APD_BIN_SIZE_DB``,
``APD_MIN_BIN_DBM``, and ``APD_MAX_BIN_DBM``. A single set of IIR
second-order-sections (``iir_sos``) is also used for filtering.
Upon initialization of this supervisor actor, workers are spawned
for each of the components of the data product. Initializing these
processes allows for certain parts of the computation, which do not
depend on the IQ data, to be performed ahead-of-time. The workers are
stateful, and reuse the initialized values for each consecutive run.
The ``run`` method can be called to filter and process IQ samples.
Filtering happens before the remote workers are called, which run
concurrently. The ``run`` method returns Ray object references
immediately, which can be later used to retrieve the procesed results.
"""

def __init__(self, params: dict, iir_sos: np.ndarray):
# initialize worker processes
self.iir_sos = iir_sos
Expand All @@ -332,7 +404,14 @@ def __init__(self, params: dict, iir_sos: np.ndarray):
]
del params

def run(self, iqdata: np.ndarray):
def run(self, iqdata: np.ndarray) -> list:
"""
Filter the input IQ data and concurrently compute FFT, PVT, PFP, and APD results.
:param iqdata: Complex-valued input waveform samples.
:return: A list of Ray object references which can be used to
retrieve the processed results. The order is [FFT, PVT, PFP, APD].
"""
# Filter IQ and place it in the object store
iqdata = ray.put(sosfilt(self.iir_sos, iqdata))
# Compute PSD, PVT, PFP, and APD concurrently.
Expand Down

0 comments on commit 9fdc5c3

Please sign in to comment.