Skip to content

Commit

Permalink
Merge b9e1275 into 3d4e0c3
Browse files Browse the repository at this point in the history
  • Loading branch information
geojunky authored Aug 27, 2019
2 parents 3d4e0c3 + b9e1275 commit 184ac7b
Show file tree
Hide file tree
Showing 6 changed files with 258 additions and 26 deletions.
195 changes: 195 additions & 0 deletions seismic/ASDFdatabase/asdf_preprocess.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
#!/usr/env python
"""
Description:
Reads waveforms from an ASDF file, optionally applies instrument response correction,
resamples and outputs them to another ASDF file. This preprocessing is crucial for
large-scale studies involving > 10000 Green's Functions, e.g. in ambient noise
tomography. This approach significantly reduces IO bottlenecks and computational
costs associated with having to apply instrument response corrections on data from
a given station in alternative workflows.
References:
CreationDate: 18/07/19
Developer: rakib.hassan@ga.gov.au
Revision History:
LastUpdate: 18/07/19 RH
LastUpdate: dd/mm/yyyy Who Optional description
"""

import click
import os
from mpi4py import MPI
import pyasdf
from tqdm import tqdm
from obspy import UTCDateTime, read_inventory, Inventory, Stream
from collections import defaultdict
from obspy.core.util.misc import get_window_times
import gc
from obspy.core.util.misc import limit_numpy_fft_cache

def split_list(lst, npartitions):
k, m = divmod(len(lst), npartitions)
return [lst[i * k + min(i, m):(i + 1) * k + min(i + 1, m)] for i in range(npartitions)]
# end func

def getStationInventory(master_inventory, inventory_cache, netsta):
netstaInv = None
if (master_inventory):
if (inventory_cache is None): inventory_cache = defaultdict(list)
net, sta = netsta.split('.')

if (isinstance(inventory_cache[netsta], Inventory)):
netstaInv = inventory_cache[netsta]
else:
inv = master_inventory.select(network=net, station=sta)
if(len(inv.networks)):
inventory_cache[netsta] = inv
netstaInv = inv
# end if
# end if
# end if

return netstaInv, inventory_cache
# end func

def create_station_asdf(input_asdf, output_folder, resample_rate,
instrument_response_inventory, instrument_response_output, water_level):
# mpi attributes
comm = MPI.COMM_WORLD
nproc = comm.Get_size()
rank = comm.Get_rank()

# input asdf file
ids = pyasdf.ASDFDataSet(input_asdf, mode='r')

# get stations
stations = ids.get_all_coordinates().keys()

# local work-load
stations = split_list(stations, nproc)[rank]

# read inventory
stationInvCache = None
# read inventory
inv = None
try:
inv = read_inventory(instrument_response_inventory)
except Exception as e:
print (e)
raise RuntimeError('Failed to read inventory: %s' % (instrument_response_inventory))
# end try

for s in stations:
# output asdf file
ofn = os.path.join(output_folder,
os.path.splitext(os.path.basename(input_asdf))[0] + '.%s.h5'%(s))

if (os.path.exists(ofn)): os.remove(ofn)
ods = pyasdf.ASDFDataSet(ofn, mode='w', mpi=False, compression='gzip-3')

sta = ids.waveforms[s]
for tag in tqdm(sta.list(), desc='Rank %d, Station %s:'%(rank, s)):
# get response object
sinv, stationInvCache = getStationInventory(inv, stationInvCache, s)

st = sta[tag]
dayst = Stream()
for tr in st:
start_time = tr.stats.starttime
offset = (UTCDateTime(year=start_time.year, month=start_time.month,
day=start_time.day) - start_time)
for wtr in tr.slide(3600*24, 3600*24, offset=offset, include_partial_windows=True):
wtr = wtr.copy()
dayst += wtr
# end for
# end for
gc.collect()

# remove response
if(sinv):
for tr in dayst:
limit_numpy_fft_cache(max_size_in_mb_per_cache=10)
try:
tr.remove_response(sinv, output=instrument_response_output.upper(),
water_level=water_level)
except Exception as e:
print (e)
# end try
gc.collect()
# end for
# end if

# detrend and taper
taper_length = 20.0 # seconds
for tr in dayst:
if tr.stats.npts < 4 * taper_length * tr.stats.sampling_rate:
dayst.remove(tr)
else:
tr.detrend(type="demean")
tr.detrend(type="linear")
tr.taper(max_percentage=None, max_length=1.0)
# end if
# end for
gc.collect()

# apply low-pass filter and create day traces
for tr in dayst:
tr.filter('lowpass', freq=resample_rate * 0.5, corners=6, zerophase=True)
tr.interpolate(resample_rate, method='weighted_average_slopes')
# end for
gc.collect()

# add traces
for tr in dayst:
try:
ods.add_waveforms(tr, tag='raw_recording')
except Exception as e:
print (e)
print (tr)
# end try
# end for
#break
# end for
gc.collect()

ods.add_stationxml(ids.waveforms[s].StationXML)

print ('Closing asdf file..')
del ods

#break
# end for
del ids
# end func

CONTEXT_SETTINGS = dict(help_option_names=['-h', '--help'])
@click.command(context_settings=CONTEXT_SETTINGS)
@click.argument('input-asdf', required=True,
type=click.Path(exists=True))
@click.argument('output-folder', required=True,
type=click.Path(exists=True))
@click.option('--resample-rate', default=10,
help="Resample rate in Hz; default is 10 Hz")
@click.option('--instrument-response-inventory', default=None,
type=click.Path('r'),
help="FDSNxml inventory containing instrument response information. Note that when this parameter is provided "
", instrument response corrections are automatically applied for matching stations with response "
"information.")
@click.option('--instrument-response-output',
type=click.Choice(['vel', 'disp']),
default='vel', help="Output of instrument response correction; must be either 'vel' (default) for velocity"
" or 'disp' for displacement. Note, this parameter has no effect if instrument response"
" correction is not performed.")
@click.option('--water-level', default=50., help="Water-level in dB to limit amplification during instrument response correction"
"to a certain cut-off value. Note, this parameter has no effect if instrument"
"response correction is not performed.")
def process(input_asdf, output_folder, resample_rate, instrument_response_inventory,
instrument_response_output, water_level):

create_station_asdf(input_asdf, output_folder, resample_rate, instrument_response_inventory,
instrument_response_output, water_level)
# end func

if (__name__ == '__main__'):
process()
20 changes: 16 additions & 4 deletions seismic/pick_harvester/pick.py
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ def outputConfigParameters():
# Retrieve estimated workload
# ==================================================
taupyModel = TauPyModel(model='iasp91')
fds = FederatedASDFDataSet(asdf_source, use_json_db=False, logger=None)
fds = FederatedASDFDataSet(asdf_source, logger=None)
workload = getWorkloadEstimate(fds, originTimestamps)

# ==================================================
Expand Down Expand Up @@ -445,9 +445,15 @@ def outputConfigParameters():
st = fds.get_waveforms(codes[0], codes[1], codes[2], codes[3],
curr,
curr + step,
automerge=True,
trace_count_threshold=200)

try:
st.merge(method=-1)
except Exception as e:
print (e)
continue
# end try

if (len(st) == 0): continue
dropBogusTraces(st)

Expand Down Expand Up @@ -521,14 +527,20 @@ def outputConfigParameters():
stn = fds.get_waveforms(codesn[0], codesn[1], codesn[2], codesn[3],
curr,
curr + step,
automerge=True,
trace_count_threshold=200)
ste = fds.get_waveforms(codese[0], codese[1], codese[2], codese[3],
curr,
curr + step,
automerge=True,
trace_count_threshold=200)

try:
stn.merge(method=-1)
ste.merge(method=-1)
except Exception as e:
print (e)
continue
# end try

dropBogusTraces(stn)
dropBogusTraces(ste)

Expand Down
16 changes: 11 additions & 5 deletions seismic/xcorqc/correlator.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ def process(data_source1, data_source2, output_path,
clip_to_2std=False, whitening=False, one_bit_normalize=False, read_buffer_size=10,
ds1_zchan=None, ds1_nchan=None, ds1_echan=None,
ds2_zchan=None, ds2_nchan=None, ds2_echan=None, corr_chan=None,
envelope_normalize=False, ensemble_stack=False, restart=False):
envelope_normalize=False, ensemble_stack=False, restart=False, no_tracking_tag=False):
"""
DATA_SOURCE1: Text file containing paths to ASDF files \n
DATA_SOURCE2: Text file containing paths to ASDF files \n
Expand All @@ -157,11 +157,15 @@ def process(data_source1, data_source2, output_path,
if (rank == 0):
# Register time tag with high resolution, since queued jobs can readily
# commence around the same time.
time_tag = UTCDateTime.now().strftime("%y-%m-%d.T%H.%M.%S.%f")

if(no_tracking_tag):
time_tag = None
else:
time_tag = UTCDateTime.now().strftime("%y-%m-%d.T%H.%M.%S.%f")

def outputConfigParameters():
# output config parameters
fn = 'correlator.%s.cfg' % (time_tag)
fn = 'correlator.%s.cfg' % (time_tag) if time_tag else 'correlator.cfg'
fn = os.path.join(output_path, fn)

f = open(fn, 'w+')
Expand Down Expand Up @@ -191,6 +195,7 @@ def outputConfigParameters():
f.write('%25s\t\t\t: %s\n' % ('--whitening', whitening))
f.write('%25s\t\t\t: %s\n' % ('--ensemble-stack', ensemble_stack))
f.write('%25s\t\t\t: %s\n' % ('--restart', 'TRUE' if restart else 'FALSE'))
f.write('%25s\t\t\t: %s\n' % ('--no-tracking-tag', 'TRUE' if no_tracking_tag else 'FALSE'))

f.close()
# end func
Expand Down Expand Up @@ -354,11 +359,12 @@ def outputConfigParameters():
"single CC function, aimed at producing empirical Greens "
"functions for surface wave tomography.")
@click.option('--restart', default=False, is_flag=True, help='Restart job')
@click.option('--no-tracking-tag', default=False, is_flag=True, help='Do not tag output file names with a time-tag')
def main(data_source1, data_source2, output_path, interval_seconds, window_seconds, resample_rate,
nearest_neighbours, fmin, fmax, station_names1, station_names2, start_time,
end_time, instrument_response_inventory, instrument_response_output, water_level, clip_to_2std,
whitening, one_bit_normalize, read_buffer_size, ds1_zchan, ds1_nchan, ds1_echan, ds2_zchan,
ds2_nchan, ds2_echan, corr_chan, envelope_normalize, ensemble_stack, restart):
ds2_nchan, ds2_echan, corr_chan, envelope_normalize, ensemble_stack, restart, no_tracking_tag):
"""
DATA_SOURCE1: Path to ASDF file \n
DATA_SOURCE2: Path to ASDF file \n
Expand All @@ -376,7 +382,7 @@ def main(data_source1, data_source2, output_path, interval_seconds, window_secon
nearest_neighbours, fmin, fmax, station_names1, station_names2, start_time,
end_time, instrument_response_inventory, instrument_response_output, water_level, clip_to_2std,
whitening, one_bit_normalize, read_buffer_size, ds1_zchan, ds1_nchan, ds1_echan, ds2_zchan,
ds2_nchan, ds2_echan, corr_chan, envelope_normalize, ensemble_stack, restart)
ds2_nchan, ds2_echan, corr_chan, envelope_normalize, ensemble_stack, restart, no_tracking_tag)
# end func

if __name__ == '__main__':
Expand Down
Loading

0 comments on commit 184ac7b

Please sign in to comment.