Skip to content

Commit

Permalink
Merge 1ced8ec into 9cc1064
Browse files Browse the repository at this point in the history
  • Loading branch information
janscience committed May 25, 2016
2 parents 9cc1064 + 1ced8ec commit dd8c795
Show file tree
Hide file tree
Showing 19 changed files with 2,525 additions and 1,397 deletions.
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,6 @@
*.pyc
*.npy
*.pdf
*.txt
*.txt
.coverage
*~
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,7 @@

# thunderfish

this project is meant to analyze transects, segregate fishes in wave or pulse type fishes
and generate a Histogram of EOD-Frequencies.
This repository is a collection of algorithms and programs for
analysing electric field recordings of weakly electric fish. In
particular we focus on recordings made in natural habitats.

17 changes: 0 additions & 17 deletions tests/test_auxiliary.py

This file was deleted.

40 changes: 40 additions & 0 deletions tests/test_bestwindow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
from nose.tools import assert_true, assert_equal, assert_almost_equal
import numpy as np
import thunderfish.bestwindow as bw


def test_best_window():
# generate data:
rate = 100000.0
clip = 1.3
time = np.arange(0.0, 1.0, 1.0/rate)
snippets = []
f=600.0
amf=20.0
for ampl in [0.2, 0.5, 0.8] :
for am_ampl in [0.0, 0.3, 0.9] :
data = ampl*np.sin(2.0*np.pi*f*time)*(1.0+am_ampl*np.sin(2.0*np.pi*amf*time))
data[data>clip] = clip
data[data<-clip] = -clip
snippets.extend(data)
data = np.asarray(snippets)


# compute best window:
print("call bestwindow() function...")
idx0, idx1, clipped = bw.best_window_indices(data, rate, single=False,
win_size=1.0, win_shift=0.1, thresh_ampl_fac=2.0,
min_clip=-clip, max_clip=clip,
w_cv_ampl=10.0, tolerance=0.5)

assert_equal(idx0, 6*len(time), 'bestwindow() did not correctly detect start of best window')
assert_equal(idx1, 7*len(time), 'bestwindow() did not correctly detect end of best window')
assert_almost_equal(clipped, 0.0, 'bestwindow() did not correctly detect clipped fraction')

# clipping:
clip_win_size = 0.5
min_clip, max_clip = bw.clip_amplitudes(data, int(clip_win_size*rate),
min_fac=2.0,nbins=40)

assert_true(min_clip<=-0.8*clip and min_clip>=-clip, 'clip_amplitudes() failed to detect minimum clip amplitude')
assert_true(max_clip>=0.8*clip and max_clip<=clip, 'clip_amplitudes() failed to detect maximum clip amplitude')
34 changes: 34 additions & 0 deletions tests/test_configfile.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from nose.tools import assert_equal
import thunderfish.configfile as cf
import thunderfish.bestwindow as bw
from collections import OrderedDict
import os


def test_config_file():
cfg = cf.ConfigFile()
bw.add_clip_config(cfg)
bw.add_best_window_config(cfg)

cfgfile = 'test.cfg'

# write configuration to a file:
cfg.dump(cfgfile, 'header', 50)

# manipulate some values:
cfg2 = cf.ConfigFile(cfg)
cfg2.set('bestWindowSize', 100.0)
cfg2.set('weightCVAmplitude', 20.0)
cfg2.set('clipBins', 300)
cfg3 = cf.ConfigFile(cfg2)

# read it in:
cfg2.load(cfgfile)
assert_equal(cfg, cfg2)

# read it in:
cfg3.load_files(cfgfile, 'data.dat')
assert_equal(cfg, cfg3)

# clean up:
os.remove(cfgfile)
280 changes: 280 additions & 0 deletions tests/test_peakdetection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,280 @@
from nose.tools import assert_true
import numpy as np
import thunderfish.peakdetection as pd


def test_detect_peaks():
# generate data:
time = np.arange(0.0, 10.0, 0.01)
data = np.zeros(time.shape)
pt_indices = np.random.randint(5,len(data)-10, size=40)
pt_indices.sort()
while np.any(np.diff(pt_indices).min() < 5):
pt_indices = np.random.randint(5,len(data)-10, size=40)
pt_indices.sort()
peak_indices = pt_indices[0::2]
trough_indices = pt_indices[1::2]
n = pt_indices[0]
data[0:n] = 0.1+0.9*np.arange(0.0, n)/n
up = False
for i in xrange(0,len(pt_indices)-1) :
n = pt_indices[i+1]-pt_indices[i]
if up :
data[pt_indices[i]:pt_indices[i+1]] = np.arange(0.0, n)/n
else :
data[pt_indices[i]:pt_indices[i+1]] = 1.0 - np.arange(0.0, n)/n
up = not up
n = len(data)-pt_indices[-1]
if up :
data[pt_indices[-1]:] = 0.8*np.arange(0.0, n)/n
else :
data[pt_indices[-1]:] = 1.0 - 0.8*np.arange(0.0, n)/n
up = not up
data += -0.025*time*(time-10.0)
peak_times = time[peak_indices]
trough_times = time[trough_indices]
threshold = 0.5
min_thresh = 0.3


peaks, troughs = pd.detect_peaks(data, 0.0)
assert_true(np.all(peaks == np.array([])) and np.all(troughs == np.array([])),
"detect_peaks(data, threshold) did not handle zero threshold")

peaks, troughs = pd.detect_peaks(data, -1.0)
assert_true(np.all(peaks == np.array([])) and np.all(troughs == np.array([])),
"detect_peaks(data, threshold) did not handle negative threshold")

peaks, troughs = pd.detect_peaks(data, threshold, time[:len(time)/2])
assert_true(np.all(peaks == np.array([])) and np.all(troughs == np.array([])),
"detect_peaks(data, threshold) did not handle wrong time array")


peaks, troughs = pd.detect_peaks(data, threshold)
assert_true(np.all(peaks == peak_indices),
"detect_peaks(data, threshold) did not correctly detect peaks")
assert_true(np.all(troughs == trough_indices),
"detect_peaks(data, threshold) did not correctly detect troughs")

peaks, troughs = pd.detect_peaks(data, threshold, time)
assert_true(np.all(peaks == peak_times),
"detect_peaks(data, threshold, time) did not correctly detect peaks")
assert_true(np.all(troughs == trough_times),
"detect_peaks(data, threshold, time) did not correctly detect troughs")

peaks, troughs = pd.detect_peaks(data, threshold, time,
pd.accept_peak, pd.accept_peak)
assert_true(np.all(peaks[:,0] == peak_indices),
"detect_peaks(data, threshold, time, accept_peak, accept_peak) did not correctly detect peaks")
assert_true(np.all(troughs[:,0] == trough_indices),
"detect_peaks(data, threshold, time, accept_peak, accept_peak) did not correctly detect troughs")
assert_true(np.all(peaks[:,1] == peak_times),
"detect_peaks(data, threshold, time, accept_peak, accept_peak) did not correctly detect peaks")
assert_true(np.all(troughs[:,1] == trough_times),
"detect_peaks(data, threshold, time, accept_peak, accept_peak) did not correctly detect troughs")

peaks, troughs = pd.detect_peaks(data, threshold, time,
pd.accept_peaks_size_width)
assert_true(np.all(peaks[:,0] == peak_times),
"detect_peaks(data, threshold, time, accept_peaks_size_width) did not correctly detect peaks")
assert_true(np.all(troughs == trough_times),
"detect_peaks(data, threshold, time, accept_peak, accept_peaks_size_width) did not correctly detect troughs")

peaks, troughs = pd.detect_peaks(data, threshold, None,
pd.accept_peak, pd.accept_peak)
assert_true(np.all(peaks[:,0] == peak_indices),
"detect_peaks(data, threshold, time, accept_peak, accept_peak) did not correctly detect peaks")
assert_true(np.all(troughs[:,0] == trough_indices),
"detect_peaks(data, threshold, time, accept_peak, accept_peak) did not correctly detect troughs")


def test_detect_dynamic_peaks():
# generate data:
time = np.arange(0.0, 10.0, 0.01)
data = np.zeros(time.shape)
pt_indices = np.random.randint(5,len(data)-10, size=40)
pt_indices.sort()
while np.any(np.diff(pt_indices).min() < 5):
pt_indices = np.random.randint(5,len(data)-10, size=40)
pt_indices.sort()
peak_indices = pt_indices[0::2]
trough_indices = pt_indices[1::2]
n = pt_indices[0]
data[0:n] = 0.1+0.9*np.arange(0.0, n)/n
up = False
for i in xrange(0,len(pt_indices)-1) :
n = pt_indices[i+1]-pt_indices[i]
if up :
data[pt_indices[i]:pt_indices[i+1]] = np.arange(0.0, n)/n
else :
data[pt_indices[i]:pt_indices[i+1]] = 1.0 - np.arange(0.0, n)/n
up = not up
n = len(data)-pt_indices[-1]
if up :
data[pt_indices[-1]:] = 0.8*np.arange(0.0, n)/n
else :
data[pt_indices[-1]:] = 1.0 - 0.8*np.arange(0.0, n)/n
up = not up
data += -0.025*time*(time-10.0)
peak_times = time[peak_indices]
trough_times = time[trough_indices]
threshold = 0.5
min_thresh = 0.3


peaks, troughs = pd.detect_dynamic_peaks(data, 0.0, min_thresh, 0.5, time,
pd.accept_peak_size_threshold)
assert_true(np.all(peaks == np.array([])) and np.all(troughs == np.array([])),
"detect_dynamic_peaks(data, threshold) did not handle zero threshold")

peaks, troughs = pd.detect_dynamic_peaks(data, -1.0, min_thresh, 0.5, time,
pd.accept_peak_size_threshold)
assert_true(np.all(peaks == np.array([])) and np.all(troughs == np.array([])),
"detect_dynamic_peaks(data, threshold) did not handle negative threshold")

peaks, troughs = pd.detect_dynamic_peaks(data, threshold, 0.0, 0.5, time,
pd.accept_peak_size_threshold)
assert_true(np.all(peaks == np.array([])) and np.all(troughs == np.array([])),
"detect_dynamic_peaks(data, threshold) did not handle zero min_thresh")

peaks, troughs = pd.detect_dynamic_peaks(data, threshold, -1.0, 0.5, time,
pd.accept_peak_size_threshold)
assert_true(np.all(peaks == np.array([])) and np.all(troughs == np.array([])),
"detect_dynamic_peaks(data, threshold) did not handle negative min_thresh")

peaks, troughs = pd.detect_dynamic_peaks(data, threshold, min_thresh, 0.0, time,
pd.accept_peak_size_threshold)
assert_true(np.all(peaks == np.array([])) and np.all(troughs == np.array([])),
"detect_dynamic_peaks(data, threshold) did not handle zero tau")

peaks, troughs = pd.detect_dynamic_peaks(data, threshold, min_thresh, -1.0, time,
pd.accept_peak_size_threshold)
assert_true(np.all(peaks == np.array([])) and np.all(troughs == np.array([])),
"detect_dynamic_peaks(data, threshold) did not handle negative tau")

peaks, troughs = pd.detect_dynamic_peaks(data, threshold, min_thresh, 0.5, time[:len(time)/2],
pd.accept_peak_size_threshold)
assert_true(np.all(peaks == np.array([])) and np.all(troughs == np.array([])),
"detect_dynamic_peaks(data, threshold) did not handle wrong time array")


peaks, troughs = pd.detect_dynamic_peaks(data, threshold, min_thresh, 0.5, time,
pd.accept_peak_size_threshold)
assert_true(np.all(peaks == peak_times),
"detect_dynamic_peaks(data, threshold, time, accept_peak_size_threshold) did not correctly detect peaks")
assert_true(np.all(troughs == trough_times),
"detect_dynamic_peaks(data, threshold, time, accept_peak_size_threshold) did not correctly detect troughs")

peaks, troughs = pd.detect_dynamic_peaks(data, threshold, min_thresh, 0.5, None,
pd.accept_peak_size_threshold,
thresh_ampl_fac=0.9, thresh_weight=0.1)
assert_true(np.all(peaks == peak_indices),
"detect_dynamic_peaks(data, threshold, time, accept_peak_size_threshold) did not correctly detect peaks")
assert_true(np.all(troughs == trough_indices),
"detect_dynamic_peaks(data, threshold, time, accept_peak_size_threshold) did not correctly detect troughs")



def test_trim():
# generate peak and trough indices (same length, peaks first):
pt_indices = np.random.randint(5, 1000, size=40)
pt_indices.sort()
peak_indices = pt_indices[0::2]
trough_indices = pt_indices[1::2]

# peak first, same length:
p_inx, t_inx = pd.trim(peak_indices, trough_indices)
assert_true(len(p_inx) == len(peak_indices) and np.all(p_inx == peak_indices),
"trim(peak_indices, trough_indices) failed on peaks")
assert_true(len(t_inx) == len(trough_indices) and np.all(t_inx == trough_indices),
"trim(peak_indices, trough_indices) failed on troughs")

# trough first, same length:
p_inx, t_inx = pd.trim(peak_indices[1:], trough_indices[:-1])
assert_true(len(p_inx) == len(peak_indices[1:]) and np.all(p_inx == peak_indices[1:]),
"trim(peak_indices[1:], trough_indices[:-1]) failed on peaks")
assert_true(len(t_inx) == len(trough_indices[:-1]) and np.all(t_inx == trough_indices[:-1]),
"trim(peak_indices[1:], trough_indices[:-1]) failed on troughs")

# peak first, more peaks:
p_inx, t_inx = pd.trim(peak_indices, trough_indices[:-2])
assert_true(len(p_inx) == len(peak_indices[:-2]) and np.all(p_inx == peak_indices[:-2]),
"trim(peak_indices, trough_indices[:-2]) failed on peaks")
assert_true(len(t_inx) == len(trough_indices[:-2]) and np.all(t_inx == trough_indices[:-2]),
"trim(peak_indices, trough_indices[:-2]) failed on troughs")

# trough first, more troughs:
p_inx, t_inx = pd.trim(peak_indices[1:-2], trough_indices)
assert_true(len(p_inx) == len(peak_indices[1:-2]) and np.all(p_inx == peak_indices[1:-2]),
"trim(peak_indices[1:-2], trough_indices) failed on peaks")
assert_true(len(t_inx) == len(trough_indices[:-3]) and np.all(t_inx == trough_indices[:-3]),
"trim(peak_indices[1:-2], trough_indices) failed on troughs")


def test_trim_to_peak():
# generate peak and trough indices (same length, peaks first):
pt_indices = np.random.randint(5, 1000, size=40)
pt_indices.sort()
peak_indices = pt_indices[0::2]
trough_indices = pt_indices[1::2]

# peak first, same length:
p_inx, t_inx = pd.trim_to_peak(peak_indices, trough_indices)
assert_true(len(p_inx) == len(peak_indices) and np.all(p_inx == peak_indices),
"trim_to_peak(peak_indices, trough_indices) failed on peaks")
assert_true(len(t_inx) == len(trough_indices) and np.all(t_inx == trough_indices),
"trim_to_peak(peak_indices, trough_indices) failed on troughs")

# trough first, same length:
p_inx, t_inx = pd.trim_to_peak(peak_indices[1:], trough_indices[:-1])
assert_true(len(p_inx) == len(peak_indices[1:-1]) and np.all(p_inx == peak_indices[1:-1]),
"trim_to_peak(peak_indices[1:], trough_indices[:-1]) failed on peaks")
assert_true(len(t_inx) == len(trough_indices[1:-1]) and np.all(t_inx == trough_indices[1:-1]),
"trim_to_peak(peak_indices[1:], trough_indices[:-1]) failed on troughs")

# peak first, more peaks:
p_inx, t_inx = pd.trim_to_peak(peak_indices, trough_indices[:-2])
assert_true(len(p_inx) == len(peak_indices[:-2]) and np.all(p_inx == peak_indices[:-2]),
"trim_to_peak(peak_indices, trough_indices[:-2]) failed on peaks")
assert_true(len(t_inx) == len(trough_indices[:-2]) and np.all(t_inx == trough_indices[:-2]),
"trim_to_peak(peak_indices, trough_indices[:-2]) failed on troughs")

# trough first, more troughs:
p_inx, t_inx = pd.trim_to_peak(peak_indices[1:-2], trough_indices)
assert_true(len(p_inx) == len(peak_indices[1:-2]) and np.all(p_inx == peak_indices[1:-2]),
"trim_to_peak(peak_indices[1:-2], trough_indices) failed on peaks")
assert_true(len(t_inx) == len(trough_indices[1:-2]) and np.all(t_inx == trough_indices[1:-2]),
"trim_to_peak(peak_indices[1:-2], trough_indices) failed on troughs")


def test_trim_closest():
# generate peak and trough indices (same length, peaks first):
pt_indices = np.random.randint(5, 100, size=40)*10
pt_indices.sort()
peak_indices = pt_indices

trough_indices = peak_indices - np.random.randint(1, 5, size=len(peak_indices))
p_inx, t_inx = pd.trim_closest(peak_indices, trough_indices)
assert_true(len(p_inx) == len(peak_indices) and np.all(p_inx == peak_indices),
"trim_closest(peak_indices, peak_indices-5) failed on peaks")
assert_true(len(t_inx) == len(trough_indices) and np.all(t_inx == trough_indices),
"trim_closest(peak_indices, peak_indices-5) failed on troughs")

p_inx, t_inx = pd.trim_closest(peak_indices[1:], trough_indices[:-1])
assert_true(len(p_inx) == len(peak_indices[1:-1]) and np.all(p_inx == peak_indices[1:-1]),
"trim_closest(peak_indices[1:], peak_indices-5) failed on peaks")
assert_true(len(t_inx) == len(trough_indices[1:-1]) and np.all(t_inx == trough_indices[1:-1]),
"trim_closest(peak_indices[1:], peak_indices-5) failed on troughs")

trough_indices = peak_indices + np.random.randint(1, 5, size=len(peak_indices))
p_inx, t_inx = pd.trim_closest(peak_indices, trough_indices)
assert_true(len(p_inx) == len(peak_indices) and np.all(p_inx == peak_indices),
"trim_closest(peak_indices, peak_indices+5) failed on peaks")
assert_true(len(t_inx) == len(trough_indices) and np.all(t_inx == trough_indices),
"trim_closest(peak_indices, peak_indices+5) failed on troughs")

p_inx, t_inx = pd.trim_closest(np.array([]), np.array([]))
assert_true(len(p_inx) == 0,
"trim_closest([], []) failed on peaks")
assert_true(len(t_inx) == 0,
"trim_closest([], []) failed on troughs")
Loading

0 comments on commit dd8c795

Please sign in to comment.