-
Notifications
You must be signed in to change notification settings - Fork 86
/
pre_processing.py
504 lines (451 loc) · 19.2 KB
/
pre_processing.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
"""
Utilities module whose functions are designed to do the basic \
processing of the data using obspy modules (which also rely on scipy and \
numpy).
:copyright:
EQcorrscan developers.
:license:
GNU Lesser General Public License, Version 3
(https://www.gnu.org/copyleft/lesser.html)
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
import numpy as np
import warnings
import datetime as dt
from multiprocessing import Pool, cpu_count
from obspy import Stream, Trace, UTCDateTime
from obspy.signal.filter import bandpass, lowpass, highpass
def _check_daylong(tr):
"""
Check the data quality of the daylong file.
Check to see that the day isn't just zeros, with large steps, if it is
then the resampling will hate it.
:type tr: obspy.core.trace.Trace
:param tr: Trace to check if the data are daylong.
:return quality (simply good or bad)
:rtype: bool
.. rubric:: Example
>>> from obspy import read
>>> from eqcorrscan.utils.pre_processing import _check_daylong
>>> st = read('eqcorrscan/tests/test_data/WAV/TEST_/' +
... '2013-09-01-0410-35.DFDPC_024_00')
>>> _check_daylong(st[0])
True
"""
if len(np.nonzero(tr.data)[0]) < 0.5 * len(tr.data):
qual = False
else:
qual = True
return qual
def shortproc(st, lowcut, highcut, filt_order, samp_rate, debug=0,
parallel=False, num_cores=False, starttime=None, endtime=None,
seisan_chan_names=False):
"""
Basic function to bandpass and downsample.
Works in place on data. This is employed to ensure all parts of the
data are processed in the same way.
:type st: obspy.core.stream.Stream
:param st: Stream to process
:type lowcut: float
:param lowcut: Low cut for bandpass in Hz
:type highcut: float
:param highcut: High cut for bandpass in Hz
:type filt_order: int
:param filt_order: Number of corners for bandpass filter
:type samp_rate: float
:param samp_rate: Sampling rate desired in Hz
:type debug: int
:param debug: Debug flag from 0-5, higher numbers = more output
:type parallel: bool
:param parallel: Set to True to process traces in parallel, for small \
numbers of traces this is often slower than serial processing, \
defaults to False
:type num_cores: int
:param num_cores: Control the number of cores for parallel processing, \
if set to False then this will use all the cores.
:type starttime: obspy.core.utcdatetime.UTCDateTime
:param starttime:
Desired data start time, will trim to this before processing
:type endtime: obspy.core.utcdatetime.UTCDateTime
:param endtime:
Desired data end time, will trim to this before processing
:type seisan_chan_names: bool
:param seisan_chan_names:
Whether channels are named like seisan channels (which are two letters
rather than SEED convention of three) - defaults to True.
:return: Processed stream
:rtype: :class:`obspy.core.stream.Stream`
.. note:: Will convert channel names to two characters long.
.. warning::
If you intend to use this for processing templates you should consider
how resampling will impact your cross-correlations. Minor differences
in resampling between day-long files (which you are likely to use for
continuous detection) and shorter files will reduce your
cross-correlations!
.. rubric:: Example, bandpass
>>> from obspy import read
>>> from eqcorrscan.utils.pre_processing import shortproc
>>> st = read('eqcorrscan/tests/test_data/WAV/TEST_/' +
... '2013-09-01-0410-35.DFDPC_024_00')
>>> st = shortproc(st=st, lowcut=2, highcut=9, filt_order=3, samp_rate=20,
... debug=0, parallel=True, num_cores=2)
>>> print(st[0])
AF.LABE..SHZ | 2013-09-01T04:10:35.700000Z - 2013-09-01T04:12:05.650000Z \
| 20.0 Hz, 1800 samples
.. rubric:: Example, low-pass
>>> from obspy import read
>>> from eqcorrscan.utils.pre_processing import shortproc
>>> st = read('eqcorrscan/tests/test_data/WAV/TEST_/' +
... '2013-09-01-0410-35.DFDPC_024_00')
>>> st = shortproc(st=st, lowcut=None, highcut=9, filt_order=3,
... samp_rate=20, debug=0)
>>> print(st[0])
AF.LABE..SHZ | 2013-09-01T04:10:35.700000Z - 2013-09-01T04:12:05.650000Z \
| 20.0 Hz, 1800 samples
.. rubric:: Example, high-pass
>>> from obspy import read
>>> from eqcorrscan.utils.pre_processing import shortproc
>>> st = read('eqcorrscan/tests/test_data/WAV/TEST_/' +
... '2013-09-01-0410-35.DFDPC_024_00')
>>> st = shortproc(st=st, lowcut=2, highcut=None, filt_order=3,
... samp_rate=20, debug=0)
>>> print(st[0])
AF.LABE..SHZ | 2013-09-01T04:10:35.700000Z - 2013-09-01T04:12:05.650000Z \
| 20.0 Hz, 1800 samples
"""
if isinstance(st, Trace):
tracein = True
st = Stream(st)
else:
tracein = False
# Add sanity check for filter
if highcut and highcut >= 0.5 * samp_rate:
raise IOError('Highcut must be lower than the nyquist')
if debug > 4:
parallel = False
if starttime is not None and endtime is not None:
for tr in st:
tr.trim(starttime, endtime)
if len(tr.data) == ((endtime - starttime) *
tr.stats.sampling_rate) + 1:
tr.data = tr.data[1:len(tr.data)]
elif starttime:
for tr in st:
tr.trim(starttime=starttime)
elif endtime:
for tr in st:
tr.trim(endtime=endtime)
for tr in st:
if len(tr.data) == 0:
st.remove(tr)
print('No data for %s.%s after trim' %
(tr.stats.station, tr.stats.channel))
if parallel:
if not num_cores:
num_cores = cpu_count()
pool = Pool(processes=num_cores)
results = [pool.apply_async(process, (tr,), {
'lowcut': lowcut, 'highcut': highcut, 'filt_order': filt_order,
'samp_rate': samp_rate, 'debug': debug, 'starttime': False,
'clip': False, 'seisan_chan_names': seisan_chan_names})
for tr in st]
pool.close()
stream_list = [p.get() for p in results]
pool.join()
st = Stream(stream_list)
else:
for tr in st:
process(tr=tr, lowcut=lowcut, highcut=highcut,
filt_order=filt_order, samp_rate=samp_rate, debug=debug,
starttime=False, clip=False,
seisan_chan_names=seisan_chan_names)
if tracein:
st.merge()
return st[0]
return st
def dayproc(st, lowcut, highcut, filt_order, samp_rate, starttime, debug=0,
parallel=True, num_cores=False, ignore_length=False,
seisan_chan_names=False):
"""
Wrapper for dayproc to parallel multiple traces in a stream.
Works in place on data. This is employed to ensure all parts of the data \
are processed in the same way.
:type st: obspy.core.stream.Stream
:param st: Stream to process (can be trace).
:type lowcut: float
:param lowcut: Low cut in Hz for bandpass.
:type highcut: float
:param highcut: High cut in Hz for bandpass.
:type filt_order: int
:param filt_order: Corners for bandpass.
:type samp_rate: float
:param samp_rate: Desired sampling rate in Hz.
:type starttime: obspy.core.utcdatetime.UTCDateTime
:param starttime: Desired start-date of trace.
:type debug: int
:param debug: Debug output level from 0-5, higher numbers = more output.
:type parallel: bool
:param parallel:
Set to True to process traces in parallel, this is often faster than
serial processing of traces: defaults to True.
:type num_cores: int
:param num_cores:
Control the number of cores for parallel processing, if set to False
then this will use all the cores.
:type ignore_length: bool
:param ignore_length: See warning below.
:type seisan_chan_names: bool
:param seisan_chan_names:
Whether channels are named like seisan channels (which are two letters
rather than SEED convention of three) - defaults to True.
:return: Processed stream.
:rtype: :class:`obspy.core.stream.Stream`
.. note:: Will convert channel names to two characters long.
.. warning::
Will fail if data are less than 19.2 hours long - this number is
arbitrary and is chosen to alert the user to the dangers of padding
to day-long, if you don't care you can ignore this error by setting
`ignore_length=True`. Use this option at your own risk! It will also
warn any-time it has to pad data - if you see strange artifacts in your
detections, check whether the data have gaps.
.. rubric:: Example, bandpass
>>> import obspy
>>> if int(obspy.__version__.split('.')[0]) >= 1:
... from obspy.clients.fdsn import Client
... else:
... from obspy.fdsn import Client
>>> from obspy import UTCDateTime
>>> from eqcorrscan.utils.pre_processing import dayproc
>>> client = Client('GEONET')
>>> t1 = UTCDateTime(2012, 3, 26)
>>> t2 = t1 + 86400
>>> bulk_info = [('NZ', 'FOZ', '10', 'HHE', t1, t2),
... ('NZ', 'FOZ', '10', 'HHE', t1, t2)]
>>> st = client.get_waveforms_bulk(bulk_info)
>>> st = dayproc(st=st, lowcut=2, highcut=9, filt_order=3, samp_rate=20,
... starttime=t1, debug=0, parallel=True, num_cores=2)
>>> print(st[0])
NZ.FOZ.10.HHE | 2012-03-25T23:59:59.998393Z - 2012-03-26T23:59:59.\
948393Z | 20.0 Hz, 1728000 samples
.. rubric:: Example, low-pass
>>> import obspy
>>> if int(obspy.__version__.split('.')[0]) >= 1:
... from obspy.clients.fdsn import Client
... else:
... from obspy.fdsn import Client
>>> from obspy import UTCDateTime
>>> from eqcorrscan.utils.pre_processing import dayproc
>>> client = Client('GEONET')
>>> t1 = UTCDateTime(2012, 3, 26)
>>> t2 = t1 + 86400
>>> bulk_info = [('NZ', 'FOZ', '10', 'HHE', t1, t2),
... ('NZ', 'FOZ', '10', 'HHE', t1, t2)]
>>> st = client.get_waveforms_bulk(bulk_info)
>>> st = dayproc(st=st, lowcut=None, highcut=9, filt_order=3, samp_rate=20,
... starttime=t1, debug=0, parallel=True, num_cores=2)
>>> print(st[0])
NZ.FOZ.10.HHE | 2012-03-25T23:59:59.998393Z - 2012-03-26T23:59:59.\
948393Z | 20.0 Hz, 1728000 samples
.. rubric:: Example, high-pass
>>> import obspy
>>> if int(obspy.__version__.split('.')[0]) >= 1:
... from obspy.clients.fdsn import Client
... else:
... from obspy.fdsn import Client
>>> from obspy import UTCDateTime
>>> from eqcorrscan.utils.pre_processing import dayproc
>>> client = Client('GEONET')
>>> t1 = UTCDateTime(2012, 3, 26)
>>> t2 = t1 + 86400
>>> bulk_info = [('NZ', 'FOZ', '10', 'HHE', t1, t2),
... ('NZ', 'FOZ', '10', 'HHE', t1, t2)]
>>> st = client.get_waveforms_bulk(bulk_info)
>>> st = dayproc(st=st, lowcut=2, highcut=None, filt_order=3, samp_rate=20,
... starttime=t1, debug=0, parallel=True, num_cores=2)
>>> print(st[0])
NZ.FOZ.10.HHE | 2012-03-25T23:59:59.998393Z - 2012-03-26T23:59:59.\
948393Z | 20.0 Hz, 1728000 samples
"""
# Add sanity check for filter
if isinstance(st, Trace):
st = Stream(st)
tracein = True
else:
tracein = False
if highcut and highcut >= 0.5 * samp_rate:
raise IOError('Highcut must be lower than the nyquist')
if debug > 4:
parallel = False
if parallel:
if not num_cores:
num_cores = cpu_count()
pool = Pool(processes=num_cores)
results = [pool.apply_async(process, (tr,), {
'lowcut': lowcut, 'highcut': highcut, 'filt_order': filt_order,
'samp_rate': samp_rate, 'debug': debug, 'starttime': starttime,
'clip': True, 'ignore_length': ignore_length, 'length': 86400,
'seisan_chan_names': seisan_chan_names})
for tr in st]
pool.close()
stream_list = [p.get() for p in results]
pool.join()
st = Stream(stream_list)
else:
for tr in st:
process(tr=tr, lowcut=lowcut, highcut=highcut,
filt_order=filt_order, samp_rate=samp_rate, debug=debug,
starttime=starttime, clip=True, length=86400,
ignore_length=ignore_length,
seisan_chan_names=seisan_chan_names)
if tracein:
st.merge()
return st[0]
return st
def process(tr, lowcut, highcut, filt_order, samp_rate, debug,
starttime=False, clip=False, length=86400,
seisan_chan_names=False, ignore_length=False):
"""
Basic function to process data, usually called by dayproc or shortproc.
Functionally, this will bandpass, downsample and check headers and length
of trace to ensure files start at the start of a day and are daylong.
This is a simple wrapper on obspy functions, we include it here to provide
a system to ensure all parts of the dataset are processed in the same way.
.. note:: Usually this function is called via dayproc or shortproc.
:type tr: obspy.core.trace.Trace
:param tr: Trace to process
:type lowcut: float
:param lowcut: Low cut in Hz, if set to None and highcut is set, will use \
a lowpass filter.
:type highcut: float
:param highcut: High cut in Hz, if set to None and lowcut is set, will \
use a highpass filter.
:type filt_order: int
:param filt_order: Number of corners for filter.
:type samp_rate: float
:param samp_rate: Desired sampling rate in Hz.
:type debug: int
:param debug: Debug output level from 0-5, higher numbers = more output.
:type starttime: obspy.core.utcdatetime.UTCDateTime
:param starttime: Desired start of trace
:type clip: bool
:param clip: Whether to expect, and enforce a set length of data or not.
:type length: float
:param length: Use to set a fixed length for data from the given starttime.
:type seisan_chan_names: bool
:param seisan_chan_names:
Whether channels are named like seisan channels (which are two letters
rather than SEED convention of three) - defaults to True.
:type ignore_length: bool
:param ignore_length: See warning in dayproc.
:return: Processed stream.
:type: :class:`obspy.core.stream.Stream`
"""
# Add sanity check
if highcut and highcut >= 0.5 * samp_rate:
raise IOError('Highcut must be lower than the nyquist')
# Define the start-time
if starttime:
# Be nice and allow a datetime object.
if isinstance(starttime, dt.date) or isinstance(starttime,
dt.datetime):
starttime = UTCDateTime(starttime)
day = starttime.date
else:
day = tr.stats.starttime.date
if debug >= 2:
print('Working on: ' + tr.stats.station + '.' + tr.stats.channel)
if debug >= 5:
tr.plot()
# Do a brute force quality check
qual = _check_daylong(tr)
if not qual:
msg = ("Data have more zeros than actual data, please check the raw",
" data set-up and manually sort it: " + tr.stats.station + "." +
tr.stats.channel)
raise ValueError(msg)
tr = tr.detrend('simple')
# Detrend data before filtering
if debug > 0:
print('I have ' + str(len(tr.data)) + ' data points for ' +
tr.stats.station + '.' + tr.stats.channel +
' before processing')
# Sanity check to ensure files are daylong
if float(tr.stats.npts / tr.stats.sampling_rate) != length and clip:
if debug >= 2:
print('Data for ' + tr.stats.station + '.' + tr.stats.channel +
' are not of daylong length, will zero pad')
if tr.stats.endtime - tr.stats.starttime < 0.8 * length\
and not ignore_length:
msg = ('Data for %s.%s is %i hours long, which is less than 0.8 '
'of the desired length, will not pad' %
(tr.stats.station, tr.stats.channel,
(tr.stats.endtime - tr.stats.starttime) / 3600))
raise NotImplementedError(msg)
# Use obspy's trim function with zero padding
tr = tr.trim(starttime, starttime + length, pad=True, fill_value=0,
nearest_sample=True)
# If there is one sample too many after this remove the first one
# by convention
if len(tr.data) == (length * tr.stats.sampling_rate) + 1:
tr.data = tr.data[1:len(tr.data)]
if not tr.stats.sampling_rate * length == tr.stats.npts:
raise ValueError('Data are not daylong for ' +
tr.stats.station + '.' + tr.stats.channel)
print('I now have %i data points after enforcing length'
% len(tr.data))
# Check sampling rate and resample
if tr.stats.sampling_rate != samp_rate:
if debug >= 2:
print('Resampling')
tr.resample(samp_rate)
# Filtering section
tr = tr.detrend('simple') # Detrend data again before filtering
if highcut and lowcut:
if debug >= 2:
print('Bandpassing')
tr.data = bandpass(tr.data, lowcut, highcut,
tr.stats.sampling_rate, filt_order, True)
elif highcut:
if debug >= 2:
print('Lowpassing')
tr.data = lowpass(tr.data, highcut, tr.stats.sampling_rate,
filt_order, True)
elif lowcut:
if debug >= 2:
print('Highpassing')
tr.data = highpass(tr.data, lowcut, tr.stats.sampling_rate,
filt_order, True)
else:
warnings.warn('No filters applied')
# Account for two letter channel names in s-files and therefore templates
if seisan_chan_names:
tr.stats.channel = tr.stats.channel[0] + tr.stats.channel[-1]
# Sanity check the time header
if tr.stats.starttime.day != day and clip:
warnings.warn("Time headers do not match expected date: " +
str(tr.stats.starttime))
# Sanity check to ensure files are daylong
if float(tr.stats.npts / tr.stats.sampling_rate) != length and clip:
if debug >= 2:
print('Data for ' + tr.stats.station + '.' + tr.stats.channel +
' is not of daylong length, will zero pad')
# Use obspy's trim function with zero padding
tr = tr.trim(starttime, starttime + length, pad=True, fill_value=0,
nearest_sample=True)
# If there is one sample too many after this remove the last one
# by convention
if len(tr.data) == (length * tr.stats.sampling_rate) + 1:
tr.data = tr.data[1:len(tr.data)]
if not tr.stats.sampling_rate * length == tr.stats.npts:
raise ValueError('Data are not daylong for ' +
tr.stats.station + '.' + tr.stats.channel)
# Final visual check for debug
if debug > 4:
tr.plot()
return tr
if __name__ == "__main__":
import doctest
doctest.testmod()