Skip to content

Commit

Permalink
Merge pull request #1052 from tjma12/gating
Browse files Browse the repository at this point in the history
Added basic gating as TimeSeries method
  • Loading branch information
Duncan Macleod committed Feb 27, 2019
2 parents edaa6f5 + be6f502 commit 68d5bd3
Show file tree
Hide file tree
Showing 2 changed files with 124 additions and 0 deletions.
31 changes: 31 additions & 0 deletions gwpy/timeseries/tests/test_timeseries.py
Expand Up @@ -836,6 +836,37 @@ def test_inject(self):
utils.assert_allclose(new_data.value[ind], waveform.value)
utils.assert_allclose(data.value, numpy.zeros(duration*sample_rate))

@utils.skip_minimum_version("scipy", "1.1.0")
def test_gate(self):
# generate Gaussian noise with std = 0.5
noise = self.TEST_CLASS(numpy.random.normal(scale=0.5, size=16384*64),
sample_rate=16384, epoch=-32)
# generate a glitch with amplitude 20 at 1000 Hz
glitchtime = 0.0
glitch = signal.gausspulse(noise.times.value - glitchtime,
bw=100) * 20
data = noise + glitch

# check that the glitch is at glitchtime as expected
tmax = data.times.value[data.argmax()]
nptest.assert_almost_equal(tmax, glitchtime)

# gating method will be called with whiten = False to decouple
# whitening method from gating method
tzero = 1.0
tpad = 1.0
threshold = 10.0
gated = data.gate(tzero=tzero, tpad=tpad, threshold=threshold,
whiten=False)

# check that the maximum value is not within the region set to zero
tleft = glitchtime - tzero
tright = glitchtime + tzero
assert not tleft < gated.times.value[gated.argmax()] < tright

# check that there are no remaining values above the threshold
assert gated.max() < threshold

def test_whiten(self):
# create noise with a glitch in it at 1000 Hz
noise = self.TEST_CLASS(
Expand Down
93 changes: 93 additions & 0 deletions gwpy/timeseries/timeseries.py
Expand Up @@ -1567,6 +1567,99 @@ def whiten(self, fftlength=None, overlap=0, method=DEFAULT_FFT_METHOD,
out = in_.convolve(tdw, window=window)
return out * numpy.sqrt(2 * in_.dt.decompose().value)

def gate(self, tzero=1.0, tpad=0.5, whiten=True,
threshold=50., cluster_window=0.5, **whiten_kwargs):
"""Removes high amplitude peaks from data using inverse Planck window.
Points will be discovered automatically using a provided threshold
and clustered within a provided time window.
Parameters
----------
tzero : `int`, optional
half-width time duration in which the time series is set to zero
tpad : `int`, optional
half-width time duration in which the Planck window is tapered
whiten : `bool`, optional
if True, data will be whitened before gating points are discovered,
use of this option is highly recommended
threshold : `float`, optional
amplitude threshold, if the data exceeds this value a gating window
will be placed
cluster_window : `float`, optional
time duration over which gating points will be clustered
**whiten_kwargs
other keyword arguments that will be passed to the
`TimeSeries.whiten` method if it is being used when discovering
gating points
Returns
-------
out : `~gwpy.timeseries.TimeSeries`
a copy of the original `TimeSeries` that has had gating windows
applied
Examples
--------
Read data into a `TimeSeries`
>>> from gwpy.timeseries import TimeSeries
>>> data = TimeSeries.fetch_open_data('H1', 1135148571, 1135148771)
Apply gating using custom arguments
>>> gated = data.gate(tzero=1.0, tpad=1.0, threshold=10.0,
fftlength=4, overlap=2, method='median')
Plot the original data and the gated data, whiten both for
visualization purposes
>>> overlay = data.whiten(4,2,method='median').plot(dpi=150,
label='Ungated', color='dodgerblue',
zorder=2)
>>> ax = overlay.gca()
>>> ax.plot(gated.whiten(4,2,method='median'), label='Gated',
color='orange', zorder=3)
>>> ax.set_xlim(1135148661, 1135148681)
>>> ax.legend()
>>> overlay.show()
"""
try:
from scipy.signal import find_peaks
except ImportError as exc:
exc.args = ("Must have scipy>=1.1.0 to utilize this method.",)
raise
# Find points to gate based on a threshold
data = self.whiten(**whiten_kwargs) if whiten else self
window_samples = cluster_window * data.sample_rate.value
gates = find_peaks(abs(data.value), height=threshold,
distance=window_samples)[0]
out = self.copy()

# Iterate over list of indices to gate and apply each one
nzero = int(abs(tzero) * self.sample_rate.value)
npad = int(abs(tpad) * self.sample_rate.value)
half = nzero + npad
ntotal = 2 * half
for gate in gates:
# Set the boundaries for windowed data in the original time series
left_idx = max(0, gate - half)
right_idx = min(gate + half, len(self.value) - 1)

# Choose which part of the window will replace the data
# This must be done explicitly for edge cases where a window
# overlaps index 0 or the end of the time series
left_idx_window = half - (gate - left_idx)
right_idx_window = half + (right_idx - gate)

window = 1 - planck(ntotal, nleft=npad, nright=npad)
window = window[left_idx_window:right_idx_window]
out[left_idx:right_idx] *= window

return out

def convolve(self, fir, window='hanning'):
"""Convolve this `TimeSeries` with an FIR filter using the
overlap-save method
Expand Down

0 comments on commit 68d5bd3

Please sign in to comment.