Skip to content

Commit

Permalink
Merge pull request #79 from NTIA/OperandArrayError
Browse files Browse the repository at this point in the history
Fix operand array error
  • Loading branch information
aromanielloNTIA committed May 10, 2023
2 parents 36339d7 + 23bb18b commit cc91045
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 19 deletions.
4 changes: 3 additions & 1 deletion scos_actions/signal_processing/fft.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,12 @@ def get_fft(

# Resize time data for FFTs
time_data = np.reshape(time_data[: num_ffts * fft_size], (num_ffts, fft_size))

# Apply the FFT window if provided
if fft_window is not None:
if time_data.size > NUMEXPR_THRESHOLD:
time_data = (
time_data.copy()
) # Avoids operand array error on read-only input data
ne.evaluate("time_data*fft_window", out=time_data, casting="same_kind")
else:
time_data *= fft_window
Expand Down
46 changes: 32 additions & 14 deletions scos_actions/signal_processing/tests/test_apd.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import itertools

import numexpr as ne
import numpy as np
import pytest

from scos_actions.signal_processing import apd
from scos_actions.signal_processing import NUMEXPR_THRESHOLD, apd

rng = np.random.default_rng()

Expand All @@ -20,6 +22,14 @@ def example_iq_data():
return samps


@pytest.fixture
def example_large_iq_data():
n_samps = NUMEXPR_THRESHOLD + 1
std_dev = np.sqrt(2) / 2.0
samps = rng.normal(0, std_dev, n_samps) + 1j * rng.normal(0, std_dev, n_samps)
return samps


def test_get_apd_nan_handling():
# All zero amplitudes should be converted to NaN
# Peak amplitude 0 count should be replaced with NaN
Expand All @@ -29,15 +39,20 @@ def test_get_apd_nan_handling():
assert all(np.isnan(a))


def test_get_apd_no_downsample(example_iq_data):
def test_get_apd_no_downsample(example_iq_data, example_large_iq_data):
bin_sizes = [None, 0]
immutable = [True, False]
impedance = 50
for bin_size in bin_sizes:
apd_result = apd.get_apd(example_iq_data, bin_size)
for bin_size, readonly, iq in itertools.product(
bin_sizes, immutable, (example_iq_data, example_large_iq_data)
):
if readonly:
iq.setflags(write=False)
apd_result = apd.get_apd(iq, bin_size)
assert isinstance(apd_result, tuple)
assert len(apd_result) == 2
assert all(isinstance(x, np.ndarray) for x in apd_result)
assert all(len(x) == len(example_iq_data) for x in apd_result)
assert all(len(x) == len(iq) for x in apd_result)
p, a = apd_result
assert not any(x == 0 for x in a)
np.testing.assert_equal(a, np.real(a))
Expand All @@ -47,25 +62,28 @@ def test_get_apd_no_downsample(example_iq_data):
assert all(p[i + 1] <= p[i] for i in range(len(p) - 2))
assert np.isnan(p[-1])
# Check against version with impedance provided
scaled_p, scaled_a = apd.get_apd(
example_iq_data, bin_size, impedance_ohms=impedance
)
scaled_p, scaled_a = apd.get_apd(iq, bin_size, impedance_ohms=impedance)
np.testing.assert_allclose(a - 10.0 * np.log10(impedance), scaled_a)
np.testing.assert_array_equal(p, scaled_p)


def test_get_apd_downsample(example_iq_data):
def test_get_apd_downsample(example_iq_data, example_large_iq_data):
with pytest.raises(ValueError):
_ = apd.get_apd(example_iq_data, 0.5, 100, 99)
with pytest.raises(ValueError):
_ = apd.get_apd(example_iq_data, 1.0, 90, 100.6)
bin_sizes = [1.0, 0.5, 0.25]
for bin_size in bin_sizes:
min_bin = np.nanmin(ne.evaluate("20*log10(abs(example_iq_data).real)"))
max_bin = np.nanmax(ne.evaluate("20*log10(abs(example_iq_data).real)"))
p, a = apd.get_apd(example_iq_data, bin_size, round(min_bin), round(max_bin))
immutable = [True, False]
for bin_size, readonly, iq in itertools.product(
bin_sizes, immutable, (example_iq_data, example_large_iq_data)
):
if readonly:
iq.setflags(write=False)
min_bin = np.nanmin(ne.evaluate("20*log10(abs(iq).real)"))
max_bin = np.nanmax(ne.evaluate("20*log10(abs(iq).real)"))
p, a = apd.get_apd(iq, bin_size, round(min_bin), round(max_bin))
assert len(p) == len(a)
assert len(p) < len(example_iq_data)
assert len(p) < len(iq)
np.testing.assert_equal(a, np.real(a))
assert all(a[i] <= a[i + 1] for i in range(len(a) - 1))
np.testing.assert_allclose(np.diff(a), np.ones(len(a) - 1) * bin_size)
Expand Down
30 changes: 28 additions & 2 deletions scos_actions/signal_processing/tests/test_fft.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import pytest
from scipy.signal import get_window

from scos_actions.signal_processing import fft
from scos_actions.signal_processing import NUMEXPR_THRESHOLD, fft

# Define the correct calculations. Simplified expressions are used
# in fft.get_fft_window_correction and fft.get_fft_enbw
Expand Down Expand Up @@ -64,14 +64,20 @@ def iq(length=num_ffts * fft_size, amplitude=signal_amplitude):

# Test with no window or normalization
result = fft.get_fft(iq(), fft_size, "forward", None, num_ffts, False)
# Check return type/shape just once
assert isinstance(result, np.ndarray)
assert result.dtype == np.complex128
assert result.shape == (num_ffts, fft_size)
# Results here should be signal_amplitude in DC bin, zero elsewhere
np.testing.assert_allclose(result[:, 0], np.ones(num_ffts) * signal_amplitude)
np.testing.assert_allclose(result[:, 1:], np.zeros((num_ffts, fft_size - 1)))

# Test on immutable array
iqdata_readonly = iq()
iqdata_readonly.setflags(write=False)
result = fft.get_fft(iqdata_readonly, fft_size, "forward", None, num_ffts, False)
np.testing.assert_allclose(result[:, 0], np.ones(num_ffts) * signal_amplitude)
np.testing.assert_allclose(result[:, 1:], np.zeros((num_ffts, fft_size - 1)))

# Test window provided case
result = fft.get_fft(iq(), fft_size, "forward", window, num_ffts, False)
# Results here should be signal_amplitude * window_acf in DC bin
Expand All @@ -97,6 +103,26 @@ def iq(length=num_ffts * fft_size, amplitude=signal_amplitude):
result[:, fft_size // 2 + 1 :], np.zeros((num_ffts, fft_size // 2 - 1))
)

# Test large input case (when NumExpr is used in get_fft to apply windowing)
num_ffts = (NUMEXPR_THRESHOLD // fft_size) + 1
result = fft.get_fft(
iq(num_ffts * fft_size), fft_size, "forward", window, num_ffts, False
)
# Results here should be signal_amplitude * window_acf in DC bin
np.testing.assert_allclose(
result[:, 0] * window_acf, np.ones(num_ffts) * signal_amplitude
)

# Test large immutable input with windowing
iqdatalarge_readonly = iq(num_ffts * fft_size)
iqdatalarge_readonly.setflags(write=False)
result = fft.get_fft(
iqdatalarge_readonly, fft_size, "forward", window, num_ffts, False
)
np.testing.assert_allclose(
result[:, 0] * window_acf, np.ones(num_ffts) * signal_amplitude
)


def test_get_fft_window():
# These window types are supported with SciPy >=1.8.0
Expand Down
11 changes: 9 additions & 2 deletions scos_actions/signal_processing/tests/test_power_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,13 @@ def large_array():
return np.ones(NUMEXPR_THRESHOLD * 2) * TEST_VAL_CPLX


@pytest.fixture
def large_readonly_array():
x = np.ones(NUMEXPR_THRESHOLD * 2) * TEST_VAL_CPLX
x.setflags(write=False)
return x


@pytest.fixture
def small_array():
return np.ones(NUMEXPR_THRESHOLD // 2) * TEST_VAL_CPLX
Expand All @@ -44,8 +51,8 @@ def true_scalars(complex_scalar, real_scalar):


@pytest.fixture
def all_arrays(large_array, small_array, scalar_array):
return [large_array, small_array, scalar_array]
def all_arrays(large_array, small_array, scalar_array, large_readonly_array):
return [large_array, small_array, scalar_array, large_readonly_array]


def test_calculate_power_watts(all_arrays, true_scalars):
Expand Down
12 changes: 12 additions & 0 deletions scos_actions/signal_processing/tests/test_unit_conversion.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ def test_convert(val, correct_val):
test_convert(int(v), c)
test_convert(np.ones(small_array_len) * v, c)
test_convert(np.ones(NUMEXPR_THRESHOLD) * v, c)
large_readonly = np.ones(NUMEXPR_THRESHOLD) * v
large_readonly.setflags(write=False)
test_convert(large_readonly, c)


def test_convert_dBm_to_watts(small_array_len):
Expand All @@ -58,6 +61,9 @@ def test_convert(val, correct_val):
test_convert(int(v), c)
test_convert(np.ones(small_array_len) * v, c)
test_convert(np.ones(NUMEXPR_THRESHOLD) * v, c)
large_readonly = np.ones(NUMEXPR_THRESHOLD) * v
large_readonly.setflags(write=False)
test_convert(large_readonly, c)


def test_convert_linear_to_dB(small_array_len):
Expand All @@ -79,6 +85,9 @@ def test_convert(val, correct_val):
test_convert(int(v), c)
test_convert(np.ones(small_array_len) * v, c)
test_convert(np.ones(NUMEXPR_THRESHOLD) * v, c)
large_readonly = np.ones(NUMEXPR_THRESHOLD) * v
large_readonly.setflags(write=False)
test_convert(large_readonly, c)


def test_convert_dB_to_linear(small_array_len):
Expand All @@ -100,6 +109,9 @@ def test_convert(val, correct_val):
test_convert(int(v), c)
test_convert(np.ones(small_array_len) * v, c)
test_convert(np.ones(NUMEXPR_THRESHOLD) * v, c)
large_readonly = np.ones(NUMEXPR_THRESHOLD) * v
large_readonly.setflags(write=False)
test_convert(large_readonly, c)


def test_convert_kelvins_to_celsius(small_array_len):
Expand Down

0 comments on commit cc91045

Please sign in to comment.