Skip to content

Commit

Permalink
Fix buffer issue in highest density regions, adds tests (#591)
Browse files Browse the repository at this point in the history
* Fix buffer issue in highest density regions, adds tests

* Add final new line

Co-authored-by: Joran Angevaare <jorana@nikhef.nl>
  • Loading branch information
WenzDaniel and JoranAngevaare committed Nov 18, 2021
1 parent cfd7992 commit 1c9b066
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 30 deletions.
48 changes: 29 additions & 19 deletions strax/processing/hitlets.py
Original file line number Diff line number Diff line change
Expand Up @@ -515,12 +515,12 @@ def _conditional_entropy(hitlets, template, flat=False, square_data=False):
return res


@numba.njit
@numba.njit(cache=True)
def highest_density_region_width(data,
fractions_desired,
dt=1,
fractionl_edges=False,
_buffer_size=100):
fractions_desired,
dt=1,
fractionl_edges=False,
_buffer_size=100):
"""
Function which computes the left and right edge based on the outer
most sample for the highest density region of a signal.
Expand All @@ -536,25 +536,33 @@ def highest_density_region_width(data,
:param fractionl_edges: If true computes width as fractional time
depending on the covered area between the current and next
sample.
:param _buffer_size: Maximal number of allowed intervals.
:param _buffer_size: Maximal number of allowed intervals. If signal
exceeds number e.g. due to noise width computation is skipped.
"""
res = np.zeros((len(fractions_desired), 2), dtype=np.float32)
data = np.maximum(data, 0)

if np.all(data == 0):
res[:] = np.nan
return res
else:
inter, amps = strax.highest_density_region(data,
fractions_desired, _buffer_size=_buffer_size)

for f_ind, (i, a) in enumerate(zip(inter, amps)):
inter, amps = strax.highest_density_region(data,
fractions_desired,
_buffer_size=_buffer_size,
)

for index_area_fraction, (interval_indicies, area_fraction_amplitude) in enumerate(zip(inter, amps)):
if np.all(interval_indicies[:] == -1):
res[index_area_fraction, :] = np.nan
continue

if not fractionl_edges:
res[f_ind, 0] = i[0, 0] * dt
res[f_ind, 1] = i[1, np.argmax(i[1, :])] * dt
res[index_area_fraction, 0] = interval_indicies[0, 0] * dt
res[index_area_fraction, 1] = interval_indicies[1, np.argmax(interval_indicies[1, :])] * dt
else:
left = i[0, 0]
right = i[1, np.argmax(i[1, :])] - 1 # since value corresponds to outer edge
left = interval_indicies[0, 0]
# -1 since value corresponds to outer edge:
right = interval_indicies[1, np.argmax(interval_indicies[1, :])] - 1

# Get amplitudes of outer most samples
# and amplitudes of adjacent samples (if any)
Expand All @@ -568,11 +576,13 @@ def highest_density_region_width(data,
if (right + 1) < len(data):
next_right_amp = data[right + 1]

# Compute fractions and new left and right edges:
fl = (left_amp - a) / (left_amp - next_left_amp)
fr = (right_amp - a) / (right_amp - next_right_amp)
# Compute fractions and new left and right edges, the case
# left_amp == next_left_amp cannot occure by the definition of the highest density
# region.
fl = (left_amp - area_fraction_amplitude) / (left_amp - next_left_amp)
fr = (right_amp - area_fraction_amplitude) / (right_amp - next_right_amp)

res[f_ind, 0] = (left + 0.5 - fl) * dt
res[f_ind, 1] = (right + 0.5 + fr) * dt
res[index_area_fraction, 0] = (left + 0.5 - fl) * dt
res[index_area_fraction, 1] = (right + 0.5 + fr) * dt

return res
24 changes: 13 additions & 11 deletions strax/processing/statistics.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
export, __all__ = strax.exporter()

@export
@numba.njit()
@numba.njit(cache=True)
def highest_density_region(data, fractions_desired, _buffer_size=10):
"""
Computes for a given sampled distribution the highest density region
Expand Down Expand Up @@ -86,24 +86,26 @@ def highest_density_region(data, fractions_desired, _buffer_size=10):
g_ind = -1
diff = ind[1:] - ind[:-1]
gaps = gaps[:-1][diff > 1]
if len(gaps):
mes = ('Found more intervals than "_buffer_size" allows please'
'increase the size of the buffer.')
assert len(gaps) < _buffer_size, mes

if len(gaps) > _buffer_size:
# This signal has more boundaries than the buffer can hold
# hence set all entries to -1 instead.
res[fi, 0, :] = -1
res[fi, 1, :] = -1
fi += 1
else:
for g_ind, g in enumerate(gaps):
# Loop over all gaps and get outer edges:
interval = ind[g0:g]
res[fi, 0, g_ind] = interval[0]
res[fi, 1, g_ind] = interval[-1] + 1
g0 = g

# Now we have to do the last interval:
interval = ind[g0:]
res[fi, 0, g_ind + 1] = interval[0]
res[fi, 1, g_ind + 1] = interval[-1] + 1

fi += 1
# Now we have to do the last interval:
interval = ind[g0:]
res[fi, 0, g_ind + 1] = interval[0]
res[fi, 1, g_ind + 1] = interval[-1] + 1
fi += 1

if fi == (len(fractions_desired)):
# Found all fractions so we are done
Expand Down
17 changes: 17 additions & 0 deletions tests/test_statistics.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import strax
import numpy as np
from strax.processing.hitlets import highest_density_region_width


def test_highest_density_region():
"""
Expand All @@ -25,3 +27,18 @@ def _test_highest_density_region(distribution, truth_dict):
mes = f'Have not found the correct edges for a fraction of {key}% found {int_found}, but expected ' \
f'{interval}'
assert np.all(int_found == interval), mes


def test_too_small_buffer():
"""
Unit test to check whether a too small buffer leads to np.nans
"""
distribution = np.ones(1000)
distribution[::4] = 0
indicies, _ = strax.highest_density_region(distribution, np.array([0.5]))
assert np.all(indicies == -1)

width = highest_density_region_width(distribution,
fractions_desired=np.array([0.5]),
_buffer_size=10)
assert np.all(np.isnan(width))

0 comments on commit 1c9b066

Please sign in to comment.