Skip to content

Commit

Permalink
Merge pull request #77 from NTIA/sea-action-garbage-collection
Browse files Browse the repository at this point in the history
Ray Actor to supervise IQ processing in SEA action; add DSP unit tests
  • Loading branch information
aromanielloNTIA committed May 8, 2023
2 parents fab8bdd + 0b07535 commit 36339d7
Show file tree
Hide file tree
Showing 19 changed files with 1,226 additions and 301 deletions.
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ dependencies = [
"numpy>=1.22.0",
"psutil>=5.9.4",
"python-dateutil>=2.0",
"ray>=2.0.0",
"ray>=2.4.0",
"ruamel.yaml>=0.15",
"scipy>=1.8.0",
"sigmf @ git+https://github.com/NTIA/SigMF@multi-recording-archive",
Expand All @@ -64,6 +64,7 @@ dev = [
"scos-actions[test]",
"hatchling>=1.6.0,<2.0",
"pre-commit>=2.20.0",
"ray[default]>=2.4.0",
"twine>=4.0.1,<5.0",
]

Expand Down
2 changes: 1 addition & 1 deletion scos_actions/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "6.2.3"
__version__ = "6.2.4"
499 changes: 306 additions & 193 deletions scos_actions/actions/acquire_sea_data_product.py

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions scos_actions/hardware/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def load_switches(switch_dir: Path) -> dict:
return switch_dict


def load_preslector_from_file(preselector_config_file: Path):
def load_preselector_from_file(preselector_config_file: Path):
if preselector_config_file is None:
return None
else:
Expand Down Expand Up @@ -70,6 +70,6 @@ def load_preselector(preselector_config, module, preselector_class_name):

register_component_with_status.connect(status_registration_handler)
logger.info("Connected status registration handler")
preselector = load_preslector_from_file(PRESELECTOR_CONFIG_FILE)
preselector = load_preselector_from_file(PRESELECTOR_CONFIG_FILE)
switches = load_switches(SWITCH_CONFIGS_DIR)
logger.info("Loaded {switch_count} switches.".format(switch_count=(len(switches))))
6 changes: 6 additions & 0 deletions scos_actions/signal_processing/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# Above this number of array elements, perform computations with NumExpr
# instead of with NumPy. NumExpr is slower for small arrays due to its
# setup overhead. Once arrays reach ~100s of thousands of elements, the
# speedup in the computation makes up for this overhead. This constant
# is used within multiple of the signal processing modules.
NUMEXPR_THRESHOLD = 200000
55 changes: 30 additions & 25 deletions scos_actions/signal_processing/apd.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
import logging
from typing import Tuple

import numexpr as ne
import numpy as np

from scos_actions.signal_processing.unit_conversion import convert_linear_to_dB

logger = logging.getLogger(__name__)
from scos_actions.signal_processing import NUMEXPR_THRESHOLD


def get_apd(
Expand Down Expand Up @@ -46,38 +43,43 @@ def get_apd(
probabilities, and a contains the APD amplitudes.
"""
# Convert IQ to amplitudes
all_amps = ne.evaluate("abs(x).real", {"x": time_data})
if time_data.size < NUMEXPR_THRESHOLD:
all_amps = np.abs(time_data)
else:
all_amps = ne.evaluate("abs(x).real", {"x": time_data})

# Replace any 0 value amplitudes with NaN
all_amps[all_amps == 0] = np.nan

# Convert amplitudes from V to pseudo-power
# Do not use utils.convert_linear_to_dB since the input
# here is always an array, and generally a large one.
ne.evaluate("20*log10(all_amps)", out=all_amps)
if time_data.size < NUMEXPR_THRESHOLD:
all_amps = 20.0 * np.log10(all_amps)
else:
ne.evaluate("20*log10(all_amps)", out=all_amps)

if bin_size_dB is None or bin_size_dB == 0:
# No downsampling
if min_bin is not None or max_bin is not None:
logger.warning(
f"APD bin edge specified but no downsampling is being performed"
)
downsampling = False
# No downsampling. min_bin and max_bin ignored.
a = np.sort(all_amps)
p = 1 - ((np.arange(len(a)) + 1) / len(a))
# Replace peak amplitude 0 count with NaN
p[-1] = np.nan
else:
downsampling = True
# Dynamically get bin edges if necessary
if min_bin is None:
logger.debug("Setting APD minimum bin edge to minimum recorded amplitude")
min_bin = np.nanmin(all_amps)
if max_bin is None:
logger.debug("Setting APD maximum bin edge to maximum recorded amplitude")
max_bin = np.nanmax(all_amps)
if min_bin >= max_bin:
logger.error(
raise ValueError(
f"Minimum APD bin {min_bin} is not less than maximum {max_bin}"
)
# Check that downsampling range is evenly spanned by bins
if not ((max_bin - min_bin) / bin_size_dB).is_integer():
raise ValueError(
"APD downsampling range is not evenly spanned by configured bin size."
)
# Scale bin edges to the correct units if necessary
if impedance_ohms is not None:
min_bin, max_bin = (
Expand All @@ -88,21 +90,21 @@ def get_apd(
assert np.isclose(
a[1] - a[0], bin_size_dB
) # Checks against undesired arange behavior
if not ((max_bin - min_bin) / bin_size_dB).is_integer():
logger.debug(
"APD downsampling range is not evenly spanned by configured bin size.\n"
+ f"Check that the following amplitude bin edges are correct: {a}"
)

# Get counts of amplitudes exceeding each bin value
p = sample_ccdf(all_amps, a)
# Replace any 0 probabilities with NaN
p[p == 0] = np.nan

# Scale to power if impedance value provided
if impedance_ohms is not None:
ne.evaluate("a-(10*log10(impedance_ohms))", out=a)

logger.debug(f"APD result length: {len(a)} samples.")
if downsampling:
a -= 10.0 * np.log10(impedance_ohms)
else:
if a.size < NUMEXPR_THRESHOLD:
a -= 10.0 * np.log10(impedance_ohms)
else:
ne.evaluate("a-(10*log10(impedance_ohms))", out=a)

return p, a

Expand All @@ -124,6 +126,9 @@ def sample_ccdf(a: np.ndarray, edges: np.ndarray, density: bool = True) -> np.nd
ccdf = (a.size - bin_counts.cumsum())[:-1]
if density:
ccdf = ccdf.astype("float64")
ne.evaluate("ccdf/a_size", {"ccdf": ccdf, "a_size": a.size}, out=ccdf)
if a.size < NUMEXPR_THRESHOLD:
ccdf /= a.size
else:
ne.evaluate("ccdf/a_size", {"ccdf": ccdf, "a_size": a.size}, out=ccdf)

return ccdf
2 changes: 1 addition & 1 deletion scos_actions/signal_processing/calibration.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def y_factor(
logger.debug(f"Mean power off: {mean_off_dBm:.2f} dBm")
y = convert_dB_to_linear(mean_on_dBm - mean_off_dBm)
noise_factor = enr_linear / (y - 1.0)
gain_dB = convert_watts_to_dBm(np.mean(pwr_noise_on_watts)) - convert_watts_to_dBm(
gain_dB = mean_on_dBm - convert_watts_to_dBm(
Boltzmann * temp_kelvins * enbw_hz * (enr_linear + noise_factor)
)
noise_figure_dB = convert_linear_to_dB(noise_factor)
Expand Down
42 changes: 16 additions & 26 deletions scos_actions/signal_processing/fft.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
import logging
import os

import numexpr as ne
import numpy as np
from scipy.fft import fft as sp_fft
from scipy.signal import get_window

logger = logging.getLogger(__name__)
from scos_actions.signal_processing import NUMEXPR_THRESHOLD


def get_fft(
Expand Down Expand Up @@ -63,52 +62,44 @@ def get_fft(
:return: The transformed input, scaled based on the specified
normalization mode.
"""
logger.debug("Computing FFTs")
# Make sure num_ffts and fft_size are integers
if isinstance(fft_size, int) and isinstance(num_ffts, int):
pass
else:
if fft_size == int(fft_size):
if type(fft_size) is not int:
if isinstance(fft_size, float) and fft_size.is_integer():
fft_size = int(fft_size)
else:
raise ValueError(f"fft_size must be an integer, not {type(fft_size)}.")
if num_ffts == int(num_ffts):
if type(num_ffts) is not int:
if isinstance(num_ffts, float) and num_ffts.is_integer():
num_ffts = int(num_ffts)
else:
raise ValueError(f"num_ffts must be an integer, not {type(num_ffts)}.")

# Get num_ffts for default case: as many as possible
if num_ffts <= 0:
logger.info("Number of FFTs not specified. Using as many as possible.")
num_ffts = int(len(time_data) // fft_size)
logger.info(
f"Number of FFTs set to {num_ffts} based on specified FFT size {fft_size}"
)

# Determine if truncation will occur and raise a warning if so
# Ensure that fft_size * num_ffts is the length of the input data
if len(time_data) != fft_size * num_ffts:
thrown_away_samples = len(time_data) - (fft_size * num_ffts)
msg = "Time domain data length is not divisible by num_ffts.\nTime"
msg += "domain data will be truncated; Throwing away last "
msg += f"{thrown_away_samples} sample(s)."
logger.warning(msg)
msg = "Time domain data length is not divisible by num_ffts * fft_size.\n"
msg += "Try zero-padding the input or adjusting FFT parameters."
raise ValueError(msg)

# Resize time data for FFTs
time_data = np.reshape(time_data[: num_ffts * fft_size], (num_ffts, fft_size))
logger.debug(
f"Num. FFTs: {num_ffts}, FFT Size: {fft_size}, Data shape: {time_data.shape}"
)

# Apply the FFT window if provided
if fft_window is not None:
time_data = ne.evaluate("time_data*fft_window")
if time_data.size > NUMEXPR_THRESHOLD:
ne.evaluate("time_data*fft_window", out=time_data, casting="same_kind")
else:
time_data *= fft_window

# Take the FFT
complex_fft = sp_fft(time_data, norm=norm, workers=workers)

# Shift the frequencies if desired
if shift:
complex_fft = np.fft.fftshift(complex_fft)
complex_fft = np.fft.fftshift(complex_fft, axes=(1,))
return complex_fft


Expand Down Expand Up @@ -156,7 +147,7 @@ def get_fft_window_correction(window: np.ndarray, correction_type: str) -> float
if correction_type == "amplitude":
window_correction = 1.0 / np.mean(window)
elif correction_type == "energy":
window_correction = np.sqrt(1.0 / np.mean(window**2))
window_correction = 1.0 / np.sqrt(np.mean(window**2))
else:
raise ValueError(f"Invalid window correction type: {correction_type}")

Expand All @@ -181,8 +172,7 @@ def get_fft_frequencies(
:return: A list of values representing the frequency axis of the
FFT.
"""
time_step = 1.0 / sample_rate
frequencies = np.fft.fftfreq(fft_size, time_step)
frequencies = np.fft.fftfreq(fft_size, 1.0 / sample_rate)
frequencies = np.fft.fftshift(frequencies) + center_frequency
return frequencies.tolist()

Expand Down
17 changes: 1 addition & 16 deletions scos_actions/signal_processing/filtering.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,8 @@
import logging
from multiprocessing.sharedctypes import Value
from typing import Tuple, Union

import numexpr as ne
import numpy as np
from scipy.signal import ellip, ellipord, firwin, kaiserord, sos2zpk, sosfreqz

from scos_actions.signal_processing.unit_conversion import convert_linear_to_dB

logger = logging.getLogger(__name__)


def generate_elliptic_iir_low_pass_filter(
gpass_dB: float,
Expand Down Expand Up @@ -46,7 +39,6 @@ def generate_elliptic_iir_low_pass_filter(
pb_edge_Hz, sb_edge_Hz, gpass_dB, gstop_dB, False, sample_rate_Hz
)
sos = ellip(ord, gpass_dB, gstop_dB, wn, "lowpass", False, "sos", sample_rate_Hz)
logger.debug(f"Generated low-pass IIR filter with order {ord}.")
return sos


Expand Down Expand Up @@ -80,9 +72,6 @@ def generate_fir_low_pass_filter(
True,
fs=sample_rate_Hz,
)
logger.debug(
f"Generated Type {'I' if ord % 2 == 0 else 'II'} low-pass FIR filter with order {ord} and length {ord + 1}."
)
return taps


Expand Down Expand Up @@ -150,10 +139,6 @@ def get_iir_enbw(
raise ValueError(
"Supplied frequency values must fall within +/- Nyquist frequency at baseband."
)
logger.debug(
f"Calculating filter ENBW using a frequency response of {len(worN)} points "
+ f"from {min(worN)} Hz to {max(worN)} Hz."
)
w, h = get_iir_frequency_response(sos, worN, sample_rate_Hz)
dw = np.mean(np.diff(w))
h = np.abs(h) ** 2.0
Expand All @@ -172,5 +157,5 @@ def is_stable(sos: np.ndarray) -> bool:
:return: True if the filter is stable, False if not.
"""
_, poles, _ = sos2zpk(sos)
stable = all([True if p < 1 else False for p in np.square(np.abs(poles))])
stable = all([p < 1 for p in np.square(np.abs(poles))])
return stable
Loading

0 comments on commit 36339d7

Please sign in to comment.