Skip to content

Commit

Permalink
Incorporate Location-code Preferences in Cross-Correlation Workflow (#…
Browse files Browse the repository at this point in the history
…231)

* Ability to Set Location-code Preferences

* The cross-correlation workflow now allows users the ability to set
  location-code preferences for stations featuring data from multiple
  location codes
* Added tests

* Updated dependencies in readme

* Fixing a GPS clock-correction bug
  • Loading branch information
geojunky committed Dec 22, 2021
1 parent d02ede4 commit 80dad9c
Show file tree
Hide file tree
Showing 14 changed files with 209 additions and 58 deletions.
30 changes: 17 additions & 13 deletions seismic/ASDFdatabase/_FederatedASDFDataSetImpl.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,29 +223,33 @@ def _get_correction(self, net, sta, st, et):
#end func

def _apply_correction(self, stream):

def day_split(trc):
stream = Stream()
step = 24*3600 # seconds
rstream = Stream()
daySeconds = 24 * 3600 # seconds

st = trc.stats.starttime
et = trc.stats.endtime
dayAlignedStartTime = UTCDateTime(year=st.year, month=st.month, day=st.day)
ct = dayAlignedStartTime

dayAlignedEndTime = dayAlignedStartTime + daySeconds

ct = st
trcCopy = trc.copy()
while(ct < et):
if(ct + step > et):
step = et - ct
while (ct < et):
step = daySeconds

if (ct + step > dayAlignedEndTime):
step = dayAlignedEndTime - ct
# end if
stream += trcCopy.slice(ct, ct + step)

rstream += trcCopy.slice(ct, ct + step, nearest_sample=False)

ct += step
dayAlignedEndTime += daySeconds
# wend

return stream
#end func
return rstream
# end func

resultStream = Stream()
for mtr in stream:
Expand Down
2 changes: 2 additions & 0 deletions seismic/xcorqc/Readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ From a login node on Gadi:
3. `pip3.6 install netCDF4==1.4.0 --user`
4. `pip3.6 install pyasdf==0.5.1 --user`
5. `pip3.6 install ordered_set ujson psutil --user`
6. `pip3.6 install pandas==1.1.5 --user`
7. `pip3.6 install Rtree==0.9.7 --user`

Date last validated: 10 August 2021

Expand Down
75 changes: 58 additions & 17 deletions seismic/xcorqc/correlator.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ def __init__(self, asdf_file_name, netsta_list='*'):
np.radians(self.metadata[netsta][0])])
# end for

if(len(rtps) == 0):
assert 0, 'No station-pairs found due to missing stations. Aborting..'
# end if

rtps = np.array(rtps)
xyzs = rtp2xyz(rtps[:, 0], rtps[:, 1], rtps[:, 2])

Expand Down Expand Up @@ -126,14 +130,38 @@ def get_unique_station_pairs(self, other_dataset, nn=1):
# end func
# end class

def read_location_preferences(location_preferences_fn):
result = defaultdict(lambda: None)

if(location_preferences_fn):
pref_list = open(location_preferences_fn, 'r').readlines()

for pref in pref_list:
pref = pref.strip()
if (len(pref)):
try:
netsta, loc = pref.split()
net, sta = netsta.split('.')

result[netsta] = loc
except Exception as e:
print(str(e))
assert 0, 'Error parsing: {}'.format(pref)
# end try
# end if
# end for
# end if
return result
# end func

def process(data_source1, data_source2, output_path,
interval_seconds, window_seconds, window_overlap, window_buffer_length,
resample_rate=None, taper_length=0.05, nearest_neighbours=1,
fmin=None, fmax=None, netsta_list1='*', netsta_list2='*', pairs_to_compute=None,
start_time='1970-01-01T00:00:00', end_time='2100-01-01T00:00:00',
instrument_response_inventory=None, instrument_response_output='vel', water_level=50,
clip_to_2std=False, whitening=False, whitening_window_frequency=0,
one_bit_normalize=False, read_buffer_size=10,
one_bit_normalize=False, read_buffer_size=1, location_preferences=None,
ds1_zchan=None, ds1_nchan=None, ds1_echan=None,
ds2_zchan=None, ds2_nchan=None, ds2_echan=None, corr_chan=None,
envelope_normalize=False, ensemble_stack=False, restart=False, dry_run=False,
Expand Down Expand Up @@ -162,6 +190,7 @@ def process(data_source1, data_source2, output_path,
ds2 = Dataset(data_source2, netsta_list2)

proc_stations = []
location_preferences_dict = None
time_tag = None
if (rank == 0):
# Register time tag with high resolution, since queued jobs can readily
Expand Down Expand Up @@ -227,12 +256,17 @@ def cull_pairs(pairs, keep_list_fn):
for keep_pair in keep_list:
keep_pair = keep_pair.strip()
if(len(keep_pair)):
knet1, ksta1, knet2, ksta2 = keep_pair.split('.')
try:
knet1, ksta1, knet2, ksta2 = keep_pair.split('.')

keep_pair_alt = '%s.%s.%s.%s'%(knet2, ksta2, knet1, ksta1)
keep_pair_alt = '%s.%s.%s.%s'%(knet2, ksta2, knet1, ksta1)

if(keep_pair in pairs_set or keep_pair_alt in pairs_set):
result.add(('%s.%s'%(knet1, ksta1), '%s.%s'%(knet2, ksta2)))
if(keep_pair in pairs_set or keep_pair_alt in pairs_set):
result.add(('%s.%s'%(knet1, ksta1), '%s.%s'%(knet2, ksta2)))
except Exception as e:
print(str(e))
assert 0, 'Error parsing: {}'.format(keep_pair)
# end try
# end if
# end for

Expand Down Expand Up @@ -265,6 +299,7 @@ def cull_pairs(pairs, keep_list_fn):
return
# end if

location_preferences_dict = read_location_preferences(location_preferences)
# broadcast workload to all procs
proc_stations = comm.bcast(proc_stations, root=0)
time_tag = comm.bcast(time_tag, root=0)
Expand Down Expand Up @@ -295,8 +330,8 @@ def cull_pairs(pairs, keep_list_fn):
continue
# end if

netsta1inv, stationInvCache = getStationInventory(inv, stationInvCache, netsta1)
netsta2inv, stationInvCache = getStationInventory(inv, stationInvCache, netsta2)
netsta1inv, stationInvCache = getStationInventory(inv, stationInvCache, netsta1, location_preferences_dict)
netsta2inv, stationInvCache = getStationInventory(inv, stationInvCache, netsta2, location_preferences_dict)

def evaluate_channels(cha1, cha2):
result = []
Expand Down Expand Up @@ -351,7 +386,7 @@ def evaluate_channels(cha1, cha2):
endTime, netsta1, netsta2, netsta1inv, netsta2inv,
instrument_response_output, water_level,
corr_chans[0], corr_chans[1],
baz_netsta1, baz_netsta2,
baz_netsta1, baz_netsta2, location_preferences_dict,
resample_rate, taper_length, read_buffer_size, interval_seconds,
window_seconds, window_overlap, window_buffer_length,
fmin, fmax, clip_to_2std, whitening, whitening_window_frequency,
Expand Down Expand Up @@ -437,6 +472,13 @@ def evaluate_channels(cha1, cha2):
type=int,
help="Data read buffer size; default is 1 x 'interval_seconds'. This parameter allows fetching data in bulk,"
" which can improve efficiency, but has no effect on the results produced")
@click.option('--location-preferences', default=None,
type=click.Path('r'),
help="A space-separated two-columned text file containing location code preferences for "
"stations in the form: 'NET.STA LOC'. Note that location code preferences need not "
"be provided for all stations -- the default is None. This approach allows "
"preferential selection of data from particular location codes for stations that "
"feature data from multiple location codes")
@click.option('--ds1-zchan', default='BHZ',
type=str,
help="Name of z-channel for data-source-1. This parameter and the five following are required to "
Expand Down Expand Up @@ -478,17 +520,16 @@ def evaluate_channels(cha1, cha2):
def main(data_source1, data_source2, output_path, interval_seconds, window_seconds, window_overlap,
window_buffer_length, resample_rate, taper_length, nearest_neighbours, fmin, fmax, station_names1,
station_names2, pairs_to_compute, start_time, end_time, instrument_response_inventory, instrument_response_output,
water_level, clip_to_2std, whitening, whitening_window_frequency, one_bit_normalize, read_buffer_size,
water_level, clip_to_2std, whitening, whitening_window_frequency, one_bit_normalize, read_buffer_size, location_preferences,
ds1_zchan, ds1_nchan, ds1_echan, ds2_zchan, ds2_nchan, ds2_echan, corr_chan, envelope_normalize,
ensemble_stack, restart, dry_run, no_tracking_tag, scratch_folder):
"""
:param data_source1: Text file containing paths to ASDF files
:param data_source2: Text file containing paths to ASDF files
:param output_path: Output folder
:param interval_seconds: Length of time window (s) over which to compute cross-correlations; e.g. 86400 for 1 day
:param window_seconds: Length of stacking window (s); e.g 3600 for an hour. INTERVAL_SECONDS must be a multiple of \
window_seconds
:param window_overlap: Window overlap fraction; e.g. 0.1 for 10% overlap
DATA_SOURCE1: Text file containing paths to ASDF files \n
DATA_SOURCE2: Text file containing paths to ASDF files \n
OUTPUT_PATH: Output folder \n
INTERVAL_SECONDS: Length of time window (s) over which to compute cross-correlations; e.g. 86400 for 1 day\n
WINDOW_SECONDS: Length of stacking window (s); e.g 3600 for an hour. WINDOW_SECONDS must be a factor of INTERVAL_SECONDS\n
WINDOQ_OVERLAP: Window overlap fraction; e.g. 0.1 for 10% overlap\n
"""

if(resample_rate): resample_rate = float(resample_rate)
Expand All @@ -498,7 +539,7 @@ def main(data_source1, data_source2, output_path, interval_seconds, window_secon
process(data_source1, data_source2, output_path, interval_seconds, window_seconds, window_overlap,
window_buffer_length, resample_rate, taper_length, nearest_neighbours, fmin, fmax, station_names1,
station_names2, pairs_to_compute, start_time, end_time, instrument_response_inventory, instrument_response_output,
water_level, clip_to_2std, whitening, whitening_window_frequency, one_bit_normalize, read_buffer_size,
water_level, clip_to_2std, whitening, whitening_window_frequency, one_bit_normalize, read_buffer_size, location_preferences,
ds1_zchan, ds1_nchan, ds1_echan, ds2_zchan, ds2_nchan, ds2_echan, corr_chan, envelope_normalize,
ensemble_stack, restart, dry_run, no_tracking_tag, scratch_folder)
# end func
Expand Down
20 changes: 12 additions & 8 deletions seismic/xcorqc/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ def xyz2rtp(x, y, z):
tmp1 = x * x + y * y
tmp2 = tmp1 + z * z
rout[0] = np.sqrt(tmp2)
rout[1] = np.arctan2(sqrt(tmp1), z)
rout[1] = np.arctan2(np.sqrt(tmp1), z)
rout[2] = np.arctan2(y, x)
return rout
# end func

def getStationInventory(master_inventory, inventory_cache, netsta):
def getStationInventory(master_inventory, inventory_cache, netsta, location_preferences_dict):
netstaInv = None
if (master_inventory):
if (inventory_cache is None): inventory_cache = defaultdict(list)
Expand All @@ -37,7 +37,7 @@ def getStationInventory(master_inventory, inventory_cache, netsta):
if (isinstance(inventory_cache[netsta], Inventory)):
netstaInv = inventory_cache[netsta]
else:
inv = master_inventory.select(network=net, station=sta)
inv = master_inventory.select(network=net, station=sta, location=location_preferences_dict[netsta])
if(len(inv.networks)):
inventory_cache[netsta] = inv
netstaInv = inv
Expand Down Expand Up @@ -93,11 +93,13 @@ def fill_gaps(data, dt, max_gap_seconds=3):
return result
# end func

def _get_stream_00T(fds, net, sta, cha, start_time, end_time,
def _get_stream_00T(fds, net, sta, cha, start_time, end_time, location_preferences_dict,
baz=None, trace_count_threshold=200,
logger=None, verbose=1):

stations = fds.get_stations(start_time, end_time, network=net, station=sta)
netsta = net + '.' + sta
stations = fds.get_stations(start_time, end_time, network=net, station=sta,
location=location_preferences_dict[netsta])

stations_nch = [s for s in stations if 'N' == s[3][-1].upper() or '1' == s[3][-1]] # only N channels
stations_ech = [s for s in stations if 'E' == s[3][-1].upper() or '2' == s[3][-1]] # only E channels
Expand Down Expand Up @@ -153,15 +155,17 @@ def _get_stream_00T(fds, net, sta, cha, start_time, end_time,
return stt
# end func

def get_stream(fds, net, sta, cha, start_time, end_time,
def get_stream(fds, net, sta, cha, start_time, end_time, location_preferences_dict,
baz=None, trace_count_threshold=200,
logger=None, verbose=1):

if (cha == '00T'): return _get_stream_00T(fds, net, sta, cha, start_time, end_time,
if (cha == '00T'): return _get_stream_00T(fds, net, sta, cha, start_time, end_time, location_preferences_dict,
baz=baz, trace_count_threshold=trace_count_threshold,
logger=logger, verbose=verbose)
st = Stream()
stations = fds.get_stations(start_time, end_time, network=net, station=sta)
netsta = net + '.' + sta
stations = fds.get_stations(start_time, end_time, network=net, station=sta,
location=location_preferences_dict[netsta])
for codes in stations:
if (cha != codes[3]): continue
st += fds.get_waveforms(codes[0], codes[1], codes[2], codes[3], start_time,
Expand Down
21 changes: 17 additions & 4 deletions seismic/xcorqc/xcorqc.py
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,7 @@ def IntervalStackXCorr(refds, tempds,
temp_cha,
baz_ref_net_sta,
baz_temp_net_sta,
location_preferences_dict=defaultdict(lambda: None),
resample_rate=None,
taper_length=0.05,
buffer_seconds=864000, interval_seconds=86400,
Expand Down Expand Up @@ -516,6 +517,8 @@ def IntervalStackXCorr(refds, tempds,
:param baz_ref_net_sta: Back-azimuth of ref station from temp station in degrees
:type baz_temp_net_sta: float
:param baz_temp_net_sta: Back-azimuth of temp station from ref station in degrees
:type location_preferences_dict: defaultdict
:param location_preferences_dict: A defaultdict containing location code preferences, keyed by NET.STA
:type resample_rate: float
:param resample_rate: Resampling rate (Hz). Applies to both data-sets
:type taper_length: float
Expand Down Expand Up @@ -580,10 +583,14 @@ def IntervalStackXCorr(refds, tempds,
# end if

# setup logger
stationPair = '%s.%s' % (ref_net_sta, temp_net_sta)
ref_loc = location_preferences_dict[ref_net_sta]
temp_loc = location_preferences_dict[temp_net_sta]
if(ref_loc is None): ref_loc = ''
if(temp_loc is None): temp_loc = ''
stationPair = '%s.%s.%s.%s.%s.%s' % (ref_net_sta, ref_loc, ref_cha, temp_net_sta, temp_loc, temp_cha)
fn = os.path.join(outputPath, '%s.log' % (stationPair if not tracking_tag else
'.'.join([stationPair, tracking_tag])))
logger = setup_logger('%s.%s' % (ref_net_sta, temp_net_sta), fn)
logger = setup_logger(stationPair, fn)

#######################################
# Initialize variables for main loop
Expand All @@ -610,7 +617,10 @@ def IntervalStackXCorr(refds, tempds,
refSt = None
try:
rnc, rsc = ref_net_sta.split('.')
refSt = get_stream(refds, rnc, rsc, ref_cha, cTime, cTime + cStep, baz=baz_ref_net_sta,
refSt = get_stream(refds, rnc, rsc,
ref_cha, cTime, cTime + cStep,
location_preferences_dict,
baz=baz_ref_net_sta,
logger=logger, verbose=verbose)
except Exception as e:
logger.error('\t'+str(e))
Expand All @@ -634,7 +644,10 @@ def IntervalStackXCorr(refds, tempds,
tempSt = None
try:
tnc, tsc = temp_net_sta.split('.')
tempSt = get_stream(tempds, tnc, tsc, temp_cha, cTime, cTime + cStep, baz=baz_temp_net_sta,
tempSt = get_stream(tempds, tnc, tsc,
temp_cha, cTime, cTime + cStep,
location_preferences_dict,
baz=baz_temp_net_sta,
logger=logger, verbose=verbose)
except Exception as e:
logger.error('\t'+str(e))
Expand Down
1 change: 0 additions & 1 deletion tests/test_seismic/xcorqc/data/asdf_file_list1.txt

This file was deleted.

1 change: 0 additions & 1 deletion tests/test_seismic/xcorqc/data/asdf_file_list2.txt

This file was deleted.

Binary file not shown.
Binary file not shown.
Binary file modified tests/test_seismic/xcorqc/data/expected/expected.tar.gz
Binary file not shown.
Binary file added tests/test_seismic/xcorqc/data/test_data_MAW.h5
Binary file not shown.
Binary file added tests/test_seismic/xcorqc/data/test_data_WRAB.h5
Binary file not shown.

0 comments on commit 80dad9c

Please sign in to comment.