-
Notifications
You must be signed in to change notification settings - Fork 81
/
s05compute_mwcs.py
174 lines (139 loc) · 6.95 KB
/
s05compute_mwcs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
"""
.. warning:: if using only ``mov_stack`` = 1, no MWCS jobs is inserted in the
database and consequently, no MWCS calculation will be done! FIX!
Following Clarke et al (2011), we apply the :ref:`mwcs`
to study the relative dephasing between Moving-Window stacks ("Current") and a
Reference using Moving-Window Cross-Spectral analysis. The *jobs* "T"o do have
been inserted in the datavase during the stack procedure.
Filter Configuration Parameters
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
* ``mwcs_low``: The lower frequency bound of the linear regression done in
MWCS (in Hz)
* ``mwcs_high``: The upper frequency bound of the linear regression done in
MWCS (in Hz)
* ``mwcs_wlen``: Window length (in seconds) to perform MWCS
* ``mwcs_step``: Step (in seconds) of the windowing procedure in MWCS
* |hpc| | *new in 1.6*
In short, both time series are sliced in several overlapping windows and
preprocessed. The similarity of the two time-series is assessed using the
cross-coherence between energy densities in the frequency domain. The time
delay between the two cross correlations is found in the unwrapped phase of
the cross spectrum and is linearly proportional to frequency. This "Delay" for
each window between two signals is the slope of a weighted linear regression
(WLS) of the samples within the frequency band of interest.
For each filter, the frequency band can be configured using ``mwcs_low``
and ``mwcs_high``, and the window and overlap lengths using ``mwcs_wlen`` and
``mwcs_step``.
The output of this process is a table of delays measured at each window in the
functions. The following is an example for lag times between -115 and -90.
In this case, the window length was 10 seconds with an overlap of 5 seconds.
.. code-block:: python
LAG_TIME DELAY ERROR MEAN COHERENCE
-1.1500000000e+02 -1.4781146383e-01 5.3727119135e-02 2.7585243911e-01
-1.1000000000e+02 -6.8207526992e-02 2.0546644311e-02 3.1620999352e-01
-1.0500000000e+02 -1.0337029577e-01 8.6645155402e-03 4.2439269880e-01
-1.0000000000e+02 -2.8668775696e-02 6.2522215988e-03 5.7159849528e-01
-9.5000000000e+01 4.1803941008e-02 1.5102285789e-02 4.1238557789e-01
-9.0000000000e+01 4.8139400233e-02 3.2700657018e-02 3.0586187792e-01
This process is job-based, so it is possible to run several instances in
parallel.
Once done, each job is marked "D"one in the database and, unless ``hpc`` is
``Y``, DTT jobs are inserted/updated in the database.
To run this step:
.. code-block:: sh
$ msnoise cc dvv compute_mwcs
This step also supports parallel processing/threading:
.. code-block:: sh
$ msnoise -t 4 cc dvv compute_mwcs
will start 4 instances of the code (after 1 second delay to avoid database
conflicts). This works both with SQLite and MySQL but be aware problems
could occur with SQLite.
.. versionadded:: 1.4
Parallel Processing
"""
from .api import *
from .move2obspy import mwcs
import logbook
def main(loglevel="INFO"):
logger = logbook.Logger(__name__)
# Reconfigure logger to show the pid number in log records
logger = get_logger('msnoise.compute_mwcs_child', loglevel,
with_pid=True)
logger.info('*** Starting: Compute MWCS ***')
db = connect()
params = get_params(db)
export_format = get_config(db, 'export_format')
if export_format == "BOTH":
extension = ".MSEED"
else:
extension = "."+export_format
mov_stacks = params.mov_stack
goal_sampling_rate = float(get_config(db, "cc_sampling_rate"))
maxlag = float(get_config(db, "maxlag"))
params = get_params(db)
# First we reset all DTT jobs to "T"odo if the REF is new for a given pair
# for station1, station2 in get_station_pairs(db, used=True):
# sta1 = "%s.%s" % (station1.net, station1.sta)
# sta2 = "%s.%s" % (station2.net, station2.sta)
# pair = "%s:%s" % (sta1, sta2)
# if is_dtt_next_job(db, jobtype='DTT', ref=pair):
# logger.info(
# "We will recompute all MWCS based on the new REF for %s" % pair)
# reset_dtt_jobs(db, pair)
# update_job(db, "REF", pair, jobtype='DTT', flag='D')
#
logger.debug('Ready to compute')
# Then we compute the jobs
outfolders = []
filters = get_filters(db, all=False)
time.sleep(np.random.random() * 5)
while is_dtt_next_job(db, flag='T', jobtype='MWCS'):
#TODO would it be possible to make the next 8 lines in the API ?
jobs = get_dtt_next_job(db, flag='T', jobtype='MWCS')
if not len(jobs):
# edge case, should only occur when is_next returns true, but
# get_next receives no jobs (heavily parallelised calls).
time.sleep(np.random.random())
continue
pair = jobs[0].pair
refs, days = zip(*[[job.ref, job.day] for job in jobs])
logger.info(
"There are MWCS jobs for some days to recompute for %s" % pair)
for f in filters:
filterid = int(f.ref)
for components in params.all_components:
ref_name = pair.replace(':', '_')
station1, station2 = pair.split(":")
ref = get_ref(db, station1, station2, filterid, components,
params)
if not len(ref):
logging.debug("No REF file found for %s.%i.%s, skipping." %
(ref_name, filterid, components))
continue
ref = ref.data
for mov_stack in mov_stacks:
n, data = get_results(db, station1, station2, filterid,
components, days, mov_stack,
format="matrix", params=params)
for i, cur in enumerate(data):
if np.all(np.isnan(cur)):
continue
logger.debug(
'Processing MWCS for: %s.%s.%02i - %s - %02i days' %
(ref_name, components, filterid, days[i], mov_stack))
output = mwcs(cur, ref, f.mwcs_low, f.mwcs_high, goal_sampling_rate, -maxlag, f.mwcs_wlen, f.mwcs_step)
outfolder = os.path.join(
'MWCS', "%02i" % filterid, "%03i_DAYS" % mov_stack, components, ref_name)
if outfolder not in outfolders:
if not os.path.isdir(outfolder):
os.makedirs(outfolder)
outfolders.append(outfolder)
np.savetxt(os.path.join(outfolder, "%s.txt" % str(days[i])), output)
del output
del data
# THIS SHOULD BE IN THE API
massive_update_job(db, jobs, "D")
if not params.hpc:
for job in jobs:
update_job(db, job.day, job.pair, 'DTT', 'T')
logger.info('*** Finished: Compute MWCS ***')