-
Notifications
You must be signed in to change notification settings - Fork 81
/
stretch.py
218 lines (186 loc) · 9.6 KB
/
stretch.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
"""
Stretching...
"""
from .api import *
from numpy import asarray as ar
from scipy.optimize import curve_fit
from scipy.ndimage import map_coordinates
def stretch_mat_creation(refcc, str_range=0.01, nstr=1001):
""" Matrix of stretched instance of a reference trace.
The reference trace is stretched using a cubic spline interpolation
algorithm form ``-str_range`` to ``str_range`` (in %) for totally
``nstr`` steps.
The output of this function is a matrix containing the stretched version
of the reference trace (one each row) (``strrefmat``) and the corresponding
stretching amount (`strvec```).
:type refcc: :class:`~numpy.ndarray`
:param refcc: 1d ndarray. The reference trace that will be stretched
:type str_range: float
:param str_range: Amount of the desired stretching (one side)
:type nstr: int
:param nstr: Number of stretching steps (one side)
:rtype: :class:`~numpy.ndarray` and float
:return: **strrefmat**:
- 2d ndarray of stretched version of the reference trace.
Its size is ``(nstr,len(refcc)/2)`` if ``signle_side==True``
otherwise it is ``(nstr,len(refcc))``
:rtype: float
:return: **strvec**: List of float, stretch amount for each row
of ``strrefmat``
"""
n = len(refcc)
samples_idx = np.arange(n) - n // 2
strvec = 1 + np.linspace(-str_range, str_range, nstr)
str_timemat = np.zeros((nstr, n))
for ii in np.arange(nstr):
str_timemat[ii, :] = samples_idx / strvec[nstr - 1 - ii]
strrefmat = np.zeros_like(str_timemat)
coord = np.zeros((2, n))
for (i, row) in enumerate(str_timemat):
coord[0, :] = row + n // 2
strrefmat[i, :] = map_coordinates(refcc.reshape((len(refcc), 1)), coord)
return strrefmat, strvec
def main(loglevel="INFO"):
logger = get_logger('msnoise.compute_psd_child', loglevel,
with_pid=True)
logger.info('*** Starting: Compute STR ***')
db = connect()
params = get_params(db)
mov_stacks = params.mov_stack
goal_sampling_rate = params.cc_sampling_rate
maxlag = params.maxlag
export_format = params.export_format
if export_format == "BOTH":
extension = ".MSEED"
else:
extension = "."+export_format
# First we reset all DTT jobs to "T"odo if the REF is new for a given pair
# for station1, station2 in get_station_pairs(db, used=True):
# sta1 = "%s.%s" % (station1.net, station1.sta)
# sta2 = "%s.%s" % (station2.net, station2.sta)
# pair = "%s:%s" % (sta1, sta2)
# if is_dtt_next_job(db, jobtype='DTT', ref=pair):
# logging.info(
# "We will recompute all STR based on the new REF for %s" % pair)
# reset_dtt_jobs(db, pair)
# update_job(db, "REF", pair, jobtype='DTT', flag='D')
filters = get_filters(db, all=False)
params = get_params(db)
str_range = params.stretching_max
nstr = params.stretching_nsteps
# Then we compute the jobs
while is_dtt_next_job(db, flag='T', jobtype='MWCS'):
jobs = get_dtt_next_job(db, flag='T', jobtype='MWCS')
if not len(jobs):
# edge case, should only occur when is_next returns true, but
# get_next receives no jobs (heavily parallelised calls).
time.sleep(np.random.random())
continue
pair = jobs[0].pair
refs, days = zip(*[[job.ref, job.day] for job in jobs])
logging.info(
"There are STR (MWCS) jobs for some days to recompute for %s" % pair)
ref_name = pair.replace(':', '_')
station1, station2 = pair.split(":")
dtt_lag = get_config(db, "dtt_lag")
dtt_v = float(get_config(db, "dtt_v"))
dtt_minlag = float(get_config(db, "dtt_minlag"))
dtt_width = float(get_config(db, "dtt_width"))
dtt_sides = get_config(db, "dtt_sides")
if dtt_lag == "static":
minlag = dtt_minlag
else:
minlag = get_interstation_distance(station1, station2, station1.coordinates) / dtt_v
maxlag2 = minlag + dtt_width
logger.debug("betweeen", minlag, "and", maxlag2)
for f in get_filters(db, all=False):
filterid = int(f.ref)
for mov_stack in mov_stacks:
for components in params.all_components:
rf = os.path.join("STACKS", "%02i" %
filterid, "REF", components, ref_name + extension)
if os.path.isfile(rf):
ref = get_ref(db, station1, station2, filterid, components, params).data
mid = int(goal_sampling_rate*maxlag)
ref[mid-int(minlag*goal_sampling_rate):mid+int(minlag*goal_sampling_rate)] *= 0.
ref[:mid-int(maxlag2*goal_sampling_rate)] *= 0.
ref[mid+int(maxlag2*goal_sampling_rate):] *= 0.
else:
logging.debug(
"No REF file named %s, skipping." % rf)
continue
alldays = []
alldeltas = []
allcoefs = []
allerrs = []
ref_stretched, deltas = stretch_mat_creation(ref,
str_range=str_range,
nstr=nstr)
n, data = get_results(db, station1, station2, filterid, components, days, mov_stack, format="matrix",params=params)
for i, cur in enumerate(data):
if np.all(np.isnan(cur)):
continue
cur[mid-int(minlag*goal_sampling_rate):mid+int(minlag*goal_sampling_rate)] *= 0.
cur[:mid-int(maxlag2*goal_sampling_rate)] *= 0.
cur[mid+int(maxlag2*goal_sampling_rate):] *= 0. ### replace with zeroes at all
### times outside minlag to maxlag
logging.debug(
'Processing Stretching for: %s.%s.%02i - %s - %02i days' %
(ref_name, components, filterid, days[i], mov_stack))
coeffs =[]
for j in range(ref_stretched.shape[0]):
ci = np.corrcoef(cur,ref_stretched[j])[0,1]
coeffs.append(ci)
tday = datetime.datetime.strptime(days[i], "%Y-%m-%d")
alldays.append(tday)
alldeltas.append(deltas[np.argmax(coeffs)])
allcoefs.append(np.max(coeffs))
###### gaussian fit ######
def gauss_function(x, a, x0, sigma):
return a*np.exp(-(x-x0)**2/(2*sigma**2))
x = ar(range(len(coeffs)))
ymax_index = coeffs.index(np.max(coeffs))
ymin = np.min(coeffs)
coeffs_shift = []
for j in coeffs:
j += np.absolute(ymin) # make all points above zero
coeffs_shift.append(j)
n = len(coeffs)
x0 = sum(x)/n
sigma = (sum((x-x0)**2)/n)**0.5
try:
popt, pcov = curve_fit(gauss_function, x, coeffs_shift, [ymax_index, x0, sigma])
FWHM = 2 * ((2*np.log(2))**0.5)*popt[2] # convert sigma (popt[2]) to FWHM
error = FWHM / 2 ### error is half width at full maximum
except RuntimeError:
error = np.nan # gaussian fit failed
allerrs.append(error)
df = pd.DataFrame(np.array([alldeltas,allcoefs,allerrs]).T, index=alldays, columns=["Delta", "Coeff", "Error"],)
output = os.path.join('STR', "%02i" % filterid, "%03i_DAYS" % mov_stack, components)
if not os.path.isdir(output):
os.makedirs(output)
fn = os.path.join(output, "%s.csv" % ref_name)
if not os.path.isfile(fn):
df.to_csv(fn, index_label="Date")
else:
dest = pd.read_csv(fn, index_col=0,
parse_dates=True)
final = pd.concat([dest, df])
final = final[~final.index.duplicated(keep='last')]
final = final.sort_index()
final.to_csv(fn, index_label="Date")
# df.to_csv(os.path.join(output, "%s.csv" % ref_name), index_label="Date")
# THIS SHOULD BE IN THE API
updated = False
mappings = [{'ref': job.ref, 'flag': "D"} for job in jobs]
while not updated:
try:
db.bulk_update_mappings(Job, mappings)
db.commit()
updated = True
except:
time.sleep(np.random.random())
pass
logging.info('*** Finished: Compute STR ***')
if __name__ == "__main__":
main()