Skip to content

Commit

Permalink
Merge pull request #33 from jfsehuanes/fix_bwin_th
Browse files Browse the repository at this point in the history
different methods are now available for setting bestwindow threshold. COMPLETE FOLDER WAS REFORMATED!
  • Loading branch information
janscience authored Jun 24, 2016
2 parents 8edc44a + 14d5fe9 commit a97365a
Show file tree
Hide file tree
Showing 29 changed files with 1,361 additions and 1,212 deletions.
37 changes: 19 additions & 18 deletions tests/test_bestwindow.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,34 +7,35 @@ def test_best_window():
# generate data:
rate = 100000.0
clip = 1.3
time = np.arange(0.0, 1.0, 1.0/rate)
time = np.arange(0.0, 1.0, 1.0 / rate)
snippets = []
f=600.0
amf=20.0
for ampl in [0.2, 0.5, 0.8] :
for am_ampl in [0.0, 0.3, 0.9] :
data = ampl*np.sin(2.0*np.pi*f*time)*(1.0+am_ampl*np.sin(2.0*np.pi*amf*time))
data[data>clip] = clip
data[data<-clip] = -clip
f = 600.0
amf = 20.0
for ampl in [0.2, 0.5, 0.8]:
for am_ampl in [0.0, 0.3, 0.9]:
data = ampl * np.sin(2.0 * np.pi * f * time) * (1.0 + am_ampl * np.sin(2.0 * np.pi * amf * time))
data[data > clip] = clip
data[data < -clip] = -clip
snippets.extend(data)
data = np.asarray(snippets)


# compute best window:
print("call bestwindow() function...")
idx0, idx1, clipped = bw.best_window_indices(data, rate, single=False,
win_size=1.0, win_shift=0.1, thresh_ampl_fac=2.0,
min_clip=-clip, max_clip=clip,
w_cv_ampl=10.0, tolerance=0.5)
win_size=1.0, win_shift=0.1, thresh_ampl_fac=2.0,
min_clip=-clip, max_clip=clip,
w_cv_ampl=10.0, tolerance=0.5)

assert_equal(idx0, 6*len(time), 'bestwindow() did not correctly detect start of best window')
assert_equal(idx1, 7*len(time), 'bestwindow() did not correctly detect end of best window')
assert_equal(idx0, 6 * len(time), 'bestwindow() did not correctly detect start of best window')
assert_equal(idx1, 7 * len(time), 'bestwindow() did not correctly detect end of best window')
assert_almost_equal(clipped, 0.0, 'bestwindow() did not correctly detect clipped fraction')

# clipping:
clip_win_size = 0.5
min_clip, max_clip = bw.clip_amplitudes(data, int(clip_win_size*rate),
min_fac=2.0,nbins=40)
min_clip, max_clip = bw.clip_amplitudes(data, int(clip_win_size * rate),
min_fac=2.0, nbins=40)

assert_true(min_clip<=-0.8*clip and min_clip>=-clip, 'clip_amplitudes() failed to detect minimum clip amplitude')
assert_true(max_clip>=0.8*clip and max_clip<=clip, 'clip_amplitudes() failed to detect maximum clip amplitude')
assert_true(min_clip <= -0.8 * clip and min_clip >= -clip,
'clip_amplitudes() failed to detect minimum clip amplitude')
assert_true(max_clip >= 0.8 * clip and max_clip <= clip,
'clip_amplitudes() failed to detect maximum clip amplitude')
4 changes: 2 additions & 2 deletions tests/test_configfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ def test_config_file():
cfg2.set('weightCVAmplitude', 20.0)
cfg2.set('clipBins', 300)
cfg3 = cf.ConfigFile(cfg2)

# read it in:
cfg2.load(cfgfile)
assert_equal(cfg, cfg2)

# read it in:
cfg3.load_files(cfgfile, 'data.dat')
assert_equal(cfg, cfg3)
Expand Down
115 changes: 55 additions & 60 deletions tests/test_peakdetection.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,126 +7,123 @@ def test_detect_peaks():
# generate data:
time = np.arange(0.0, 10.0, 0.01)
data = np.zeros(time.shape)
pt_indices = np.random.randint(5,len(data)-10, size=40)
pt_indices = np.random.randint(5, len(data) - 10, size=40)
pt_indices.sort()
while np.any(np.diff(pt_indices).min() < 5):
pt_indices = np.random.randint(5,len(data)-10, size=40)
pt_indices = np.random.randint(5, len(data) - 10, size=40)
pt_indices.sort()
peak_indices = pt_indices[0::2]
trough_indices = pt_indices[1::2]
n = pt_indices[0]
data[0:n] = 0.1+0.9*np.arange(0.0, n)/n
data[0:n] = 0.1 + 0.9 * np.arange(0.0, n) / n
up = False
for i in xrange(0,len(pt_indices)-1) :
n = pt_indices[i+1]-pt_indices[i]
if up :
data[pt_indices[i]:pt_indices[i+1]] = np.arange(0.0, n)/n
else :
data[pt_indices[i]:pt_indices[i+1]] = 1.0 - np.arange(0.0, n)/n
for i in xrange(0, len(pt_indices) - 1):
n = pt_indices[i + 1] - pt_indices[i]
if up:
data[pt_indices[i]:pt_indices[i + 1]] = np.arange(0.0, n) / n
else:
data[pt_indices[i]:pt_indices[i + 1]] = 1.0 - np.arange(0.0, n) / n
up = not up
n = len(data)-pt_indices[-1]
if up :
data[pt_indices[-1]:] = 0.8*np.arange(0.0, n)/n
else :
data[pt_indices[-1]:] = 1.0 - 0.8*np.arange(0.0, n)/n
n = len(data) - pt_indices[-1]
if up:
data[pt_indices[-1]:] = 0.8 * np.arange(0.0, n) / n
else:
data[pt_indices[-1]:] = 1.0 - 0.8 * np.arange(0.0, n) / n
up = not up
data += -0.025*time*(time-10.0)
data += -0.025 * time * (time - 10.0)
peak_times = time[peak_indices]
trough_times = time[trough_indices]
threshold = 0.5
min_thresh = 0.3


peaks, troughs = pd.detect_peaks(data, 0.0)
assert_true(np.all(peaks == np.array([])) and np.all(troughs == np.array([])),
"detect_peaks(data, threshold) did not handle zero threshold")

peaks, troughs = pd.detect_peaks(data, -1.0)
assert_true(np.all(peaks == np.array([])) and np.all(troughs == np.array([])),
"detect_peaks(data, threshold) did not handle negative threshold")
peaks, troughs = pd.detect_peaks(data, threshold, time[:len(time)/2])

peaks, troughs = pd.detect_peaks(data, threshold, time[:len(time) / 2])
assert_true(np.all(peaks == np.array([])) and np.all(troughs == np.array([])),
"detect_peaks(data, threshold) did not handle wrong time array")



peaks, troughs = pd.detect_peaks(data, threshold)
assert_true(np.all(peaks == peak_indices),
"detect_peaks(data, threshold) did not correctly detect peaks")
assert_true(np.all(troughs == trough_indices),
"detect_peaks(data, threshold) did not correctly detect troughs")

peaks, troughs = pd.detect_peaks(data, threshold, time)
assert_true(np.all(peaks == peak_times),
"detect_peaks(data, threshold, time) did not correctly detect peaks")
assert_true(np.all(troughs == trough_times),
"detect_peaks(data, threshold, time) did not correctly detect troughs")

peaks, troughs = pd.detect_peaks(data, threshold, time,
pd.accept_peak, pd.accept_peak)
assert_true(np.all(peaks[:,0] == peak_indices),
assert_true(np.all(peaks[:, 0] == peak_indices),
"detect_peaks(data, threshold, time, accept_peak, accept_peak) did not correctly detect peaks")
assert_true(np.all(troughs[:,0] == trough_indices),
assert_true(np.all(troughs[:, 0] == trough_indices),
"detect_peaks(data, threshold, time, accept_peak, accept_peak) did not correctly detect troughs")
assert_true(np.all(peaks[:,1] == peak_times),
assert_true(np.all(peaks[:, 1] == peak_times),
"detect_peaks(data, threshold, time, accept_peak, accept_peak) did not correctly detect peaks")
assert_true(np.all(troughs[:,1] == trough_times),
assert_true(np.all(troughs[:, 1] == trough_times),
"detect_peaks(data, threshold, time, accept_peak, accept_peak) did not correctly detect troughs")

peaks, troughs = pd.detect_peaks(data, threshold, time,
pd.accept_peaks_size_width)
assert_true(np.all(peaks[:,0] == peak_times),
assert_true(np.all(peaks[:, 0] == peak_times),
"detect_peaks(data, threshold, time, accept_peaks_size_width) did not correctly detect peaks")
assert_true(np.all(troughs == trough_times),
"detect_peaks(data, threshold, time, accept_peak, accept_peaks_size_width) did not correctly detect troughs")

peaks, troughs = pd.detect_peaks(data, threshold, None,
pd.accept_peak, pd.accept_peak)
assert_true(np.all(peaks[:,0] == peak_indices),
assert_true(np.all(peaks[:, 0] == peak_indices),
"detect_peaks(data, threshold, time, accept_peak, accept_peak) did not correctly detect peaks")
assert_true(np.all(troughs[:,0] == trough_indices),
assert_true(np.all(troughs[:, 0] == trough_indices),
"detect_peaks(data, threshold, time, accept_peak, accept_peak) did not correctly detect troughs")


def test_detect_dynamic_peaks():
# generate data:
time = np.arange(0.0, 10.0, 0.01)
data = np.zeros(time.shape)
pt_indices = np.random.randint(5,len(data)-10, size=40)
pt_indices = np.random.randint(5, len(data) - 10, size=40)
pt_indices.sort()
while np.any(np.diff(pt_indices).min() < 5):
pt_indices = np.random.randint(5,len(data)-10, size=40)
pt_indices = np.random.randint(5, len(data) - 10, size=40)
pt_indices.sort()
peak_indices = pt_indices[0::2]
trough_indices = pt_indices[1::2]
n = pt_indices[0]
data[0:n] = 0.1+0.9*np.arange(0.0, n)/n
data[0:n] = 0.1 + 0.9 * np.arange(0.0, n) / n
up = False
for i in xrange(0,len(pt_indices)-1) :
n = pt_indices[i+1]-pt_indices[i]
if up :
data[pt_indices[i]:pt_indices[i+1]] = np.arange(0.0, n)/n
else :
data[pt_indices[i]:pt_indices[i+1]] = 1.0 - np.arange(0.0, n)/n
for i in xrange(0, len(pt_indices) - 1):
n = pt_indices[i + 1] - pt_indices[i]
if up:
data[pt_indices[i]:pt_indices[i + 1]] = np.arange(0.0, n) / n
else:
data[pt_indices[i]:pt_indices[i + 1]] = 1.0 - np.arange(0.0, n) / n
up = not up
n = len(data)-pt_indices[-1]
if up :
data[pt_indices[-1]:] = 0.8*np.arange(0.0, n)/n
else :
data[pt_indices[-1]:] = 1.0 - 0.8*np.arange(0.0, n)/n
n = len(data) - pt_indices[-1]
if up:
data[pt_indices[-1]:] = 0.8 * np.arange(0.0, n) / n
else:
data[pt_indices[-1]:] = 1.0 - 0.8 * np.arange(0.0, n) / n
up = not up
data += -0.025*time*(time-10.0)
data += -0.025 * time * (time - 10.0)
peak_times = time[peak_indices]
trough_times = time[trough_indices]
threshold = 0.5
min_thresh = 0.3


peaks, troughs = pd.detect_dynamic_peaks(data, 0.0, min_thresh, 0.5, time,
pd.accept_peak_size_threshold)
assert_true(np.all(peaks == np.array([])) and np.all(troughs == np.array([])),
"detect_dynamic_peaks(data, threshold) did not handle zero threshold")

peaks, troughs = pd.detect_dynamic_peaks(data, -1.0, min_thresh, 0.5, time,
pd.accept_peak_size_threshold)
assert_true(np.all(peaks == np.array([])) and np.all(troughs == np.array([])),
Expand All @@ -136,7 +133,7 @@ def test_detect_dynamic_peaks():
pd.accept_peak_size_threshold)
assert_true(np.all(peaks == np.array([])) and np.all(troughs == np.array([])),
"detect_dynamic_peaks(data, threshold) did not handle zero min_thresh")

peaks, troughs = pd.detect_dynamic_peaks(data, threshold, -1.0, 0.5, time,
pd.accept_peak_size_threshold)
assert_true(np.all(peaks == np.array([])) and np.all(troughs == np.array([])),
Expand All @@ -146,33 +143,31 @@ def test_detect_dynamic_peaks():
pd.accept_peak_size_threshold)
assert_true(np.all(peaks == np.array([])) and np.all(troughs == np.array([])),
"detect_dynamic_peaks(data, threshold) did not handle zero tau")

peaks, troughs = pd.detect_dynamic_peaks(data, threshold, min_thresh, -1.0, time,
pd.accept_peak_size_threshold)
assert_true(np.all(peaks == np.array([])) and np.all(troughs == np.array([])),
"detect_dynamic_peaks(data, threshold) did not handle negative tau")
peaks, troughs = pd.detect_dynamic_peaks(data, threshold, min_thresh, 0.5, time[:len(time)/2],

peaks, troughs = pd.detect_dynamic_peaks(data, threshold, min_thresh, 0.5, time[:len(time) / 2],
pd.accept_peak_size_threshold)
assert_true(np.all(peaks == np.array([])) and np.all(troughs == np.array([])),
"detect_dynamic_peaks(data, threshold) did not handle wrong time array")


peaks, troughs = pd.detect_dynamic_peaks(data, threshold, min_thresh, 0.5, time,
pd.accept_peak_size_threshold)
assert_true(np.all(peaks == peak_times),
"detect_dynamic_peaks(data, threshold, time, accept_peak_size_threshold) did not correctly detect peaks")
assert_true(np.all(troughs == trough_times),
"detect_dynamic_peaks(data, threshold, time, accept_peak_size_threshold) did not correctly detect troughs")

peaks, troughs = pd.detect_dynamic_peaks(data, threshold, min_thresh, 0.5, None,
pd.accept_peak_size_threshold,
thresh_ampl_fac=0.9, thresh_weight=0.1)
assert_true(np.all(peaks == peak_indices),
"detect_dynamic_peaks(data, threshold, time, accept_peak_size_threshold) did not correctly detect peaks")
assert_true(np.all(troughs == trough_indices),
"detect_dynamic_peaks(data, threshold, time, accept_peak_size_threshold) did not correctly detect troughs")



def test_trim():
Expand Down Expand Up @@ -209,7 +204,7 @@ def test_trim():
"trim(peak_indices[1:-2], trough_indices) failed on peaks")
assert_true(len(t_inx) == len(trough_indices[:-3]) and np.all(t_inx == trough_indices[:-3]),
"trim(peak_indices[1:-2], trough_indices) failed on troughs")


def test_trim_to_peak():
# generate peak and trough indices (same length, peaks first):
Expand Down Expand Up @@ -245,11 +240,11 @@ def test_trim_to_peak():
"trim_to_peak(peak_indices[1:-2], trough_indices) failed on peaks")
assert_true(len(t_inx) == len(trough_indices[1:-2]) and np.all(t_inx == trough_indices[1:-2]),
"trim_to_peak(peak_indices[1:-2], trough_indices) failed on troughs")


def test_trim_closest():
# generate peak and trough indices (same length, peaks first):
pt_indices = np.random.randint(5, 100, size=40)*10
pt_indices = np.random.randint(5, 100, size=40) * 10
pt_indices.sort()
peak_indices = pt_indices

Expand Down
Loading

0 comments on commit a97365a

Please sign in to comment.