Skip to content

Commit

Permalink
Minor optimizations
Browse files Browse the repository at this point in the history
  • Loading branch information
geojunky committed Apr 4, 2022
1 parent 619d68e commit 06f464d
Show file tree
Hide file tree
Showing 2 changed files with 163 additions and 77 deletions.
102 changes: 78 additions & 24 deletions seismic/ASDFdatabase/asdf2mseed.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def split_list(lst, npartitions):
return [lst[i * k + min(i, m):(i + 1) * k + min(i + 1, m)] for i in range(npartitions)]
# end func

def dump_traces(ds, sn_list, start_date, end_date, length, output_folder):
def dump_traces(ds, sn_list, start_date, end_date, length, min_length_sec, output_folder):
"""
Dump mseed traces from an ASDF file in parallel
Expand Down Expand Up @@ -62,7 +62,7 @@ def dump_traces(ds, sn_list, start_date, end_date, length, output_folder):
try:
st = ds.get_waveforms(net, sta, '*', '*', current_time, current_time+length, tag='raw_recording')
except:
logf.write('Failed to read stream: %s\n' % (fsName))
logf.write('Failed to read stream: %s\n' % ('.'.join([net, sta])))
continue
# end try

Expand All @@ -74,6 +74,18 @@ def dump_traces(ds, sn_list, start_date, end_date, length, output_folder):
fsName = '%s.%s-%s.MSEED'%(sn, current_time.timestamp, (current_time+length).timestamp)

try:
if(min_length_sec):
ost = Stream()

for tr in st:
if (tr.stats.npts * tr.stats.delta > min_length_sec):
ost += tr
# end if
# end for

st = ost
# end if

st.write(os.path.join(output_folder, fsName), format="MSEED")
trCounts += len(st)
except:
Expand All @@ -87,26 +99,62 @@ def dump_traces(ds, sn_list, start_date, end_date, length, output_folder):
else:
# dump all traces as they appear within the asdf file
logf.write('Exporting mseed files for station: %s\n'%(sn))
sta = ds.waveforms[sn]
for tag in sta.list():
s = sta[tag]

for t in s:
if(not isinstance(t, Trace)): continue
sta = None
try:
sta = ds.waveforms[sn]
except Exception as e:
print(sn, str(e))
# end try

if(sta):
tags = []
try:
tags = sta.list()
except Exception as e:
print(sn, str(e))
# end try

for tag in tags:
if ('raw_recording' not in tag): continue

fsName = '%s.%s-%s.MSEED'%(t.id,
t.stats.starttime.strftime("%y-%m-%d.T%H:%M:%S"),
t.stats.endtime.strftime("%y-%m-%d.T%H:%M:%S"))
st=[]
try:
t.write(os.path.join(os.path.join(output_folder, sn), fsName),
format="MSEED")
trCounts += len(s)
logf.write('Wrote waveform with tag: %s\n' % (tag))
st = sta[tag]
except Exception as e:
logf.write('Failed to write trace: %s\n'%(fsName))
logf.flush()
print(tag, str(e))
continue
# end try

if (min_length_sec):
ost = Stream()

for tr in st:
if (tr.stats.npts * tr.stats.delta > min_length_sec):
ost += tr
# end if
# end for

st = ost
# end if

for t in st:
if(not isinstance(t, Trace)): continue

fsName = '%s.%s-%s.MSEED'%(t.id,
t.stats.starttime.strftime("%y-%m-%d.T%H:%M:%S"),
t.stats.endtime.strftime("%y-%m-%d.T%H:%M:%S"))
try:
t.write(os.path.join(output_folder, fsName),
format="MSEED")
trCounts += len(st)
logf.write('Wrote waveform with tag: %s\n' % (tag))
except Exception as e:
logf.write('Failed to write trace: %s\n'%(fsName))
logf.flush()
# end for
# end for
# end for
# end if
# end if

logf.write('\t Exported (%d) traces.\n' % (trCounts))
Expand Down Expand Up @@ -134,16 +182,18 @@ def dump_traces(ds, sn_list, start_date, end_date, length, output_folder):
@click.option('--length', default=86400,
help="Length of each trace in seconds. If specified, both 'start-date' and 'end-date' must be specified; "
"otherwise this parameter is ignored; default is 86400 s (1 day)")
def process(input_asdf, output_folder, start_date, end_date, length):
@click.option('--min-length-sec', type=int, default=250, help="Minimum length in seconds")
def process(input_asdf, output_folder, start_date, end_date, length, min_length_sec):
"""
INPUT_ASDF: Path to input ASDF file\n
OUTPUT_FOLDER: Output folder \n
The script has two modes of operations: \n
The script has two modes of operation: \n
i. parameters --start-date, --end-date and --length (default 1 day) are specified, which results in
mseed files, each --length seconds long, being output for the time-range specified \n
mseed files, each --length seconds long or shorter (but longer than min_length_sec, if specified),
being output for the time-range specified \n
ii. parameters --start-date and --end-date are not provided, which results in the dumping of all
waveforms (of any arbitrary lengths) as they appear in the ASDF file \n
waveforms (longer than min_length_sec, if specified) as they appear in the ASDF file \n
\n
Example usage:
mpirun -np 2 python asdf2mseed.py ./test.asdf /tmp/output --start-date 2013-01-01T00:00:00 --end-date 2016-01-01T00:00:00
Expand All @@ -168,7 +218,7 @@ def process(input_asdf, output_folder, start_date, end_date, length):
if(rank == 0):
# split work over stations

stations = list(ds.get_all_coordinates().keys())
stations = ds.waveforms.list()
meta = ds.get_all_coordinates()

proc_stations = split_list(stations, nproc)
Expand All @@ -178,7 +228,11 @@ def process(input_asdf, output_folder, start_date, end_date, length):
f = open(fn, 'w+')
f.write('#Station\t\tLongitude\t\tLatitude\n')
for sn in stations:
f.write('%s\t\t%f\t\t%f\n'%(sn, meta[sn]['longitude'], meta[sn]['latitude']))
try:
f.write('%s\t\t%f\t\t%f\n'%(sn, meta[sn]['longitude'], meta[sn]['latitude']))
except:
continue
# end try
# end for
f.close()
# end if
Expand All @@ -187,7 +241,7 @@ def process(input_asdf, output_folder, start_date, end_date, length):
proc_stations = comm.bcast(proc_stations, root=0)

print (proc_stations[rank])
dump_traces(ds, proc_stations[rank], start_date, end_date, length, output_folder)
dump_traces(ds, proc_stations[rank], start_date, end_date, length, min_length_sec, output_folder)

del ds
# end func
Expand Down
138 changes: 85 additions & 53 deletions seismic/ASDFdatabase/cwb2asdf/cwb2asdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
"""

import glob
import os
import os, fnmatch, re
import random
from collections import defaultdict

Expand All @@ -24,6 +24,7 @@
from mpi4py import MPI
from obspy import read
from obspy.core.inventory import read_inventory
from obspy.core import Stream
from ordered_set import OrderedSet as set
from tqdm import tqdm

Expand Down Expand Up @@ -51,7 +52,7 @@ def make_ASDF_tag(tr, tag):
# end func

CONTEXT_SETTINGS = dict(help_option_names=['-h', '--help'])

BUFFER_LENGTH = 1000

@click.command(context_settings=CONTEXT_SETTINGS)
@click.argument('input-folder', required=True,
Expand All @@ -70,42 +71,73 @@ def process(input_folder, inventory, output_file_name, min_length_sec, merge_thr
INVENTORY: Path to FDSNStationXML inventory containing channel-level metadata for all stations \n
OUTPUT_FILE_NAME: Name of output ASDF file \n
"""

def _write(ds, ostream, inventory_dict, netsta_set):
try:
ds.add_waveforms(ostream, tag='raw_recording')
except Exception as e:
print(e)
print('Failed to append stream:')
print(ostream)
# end try

for item in netsta_set:
try:
ds.add_stationxml(inventory_dict[item])
except Exception as e:
print(e)
print('Failed to append inventory:')
print((inventory_dict[item]))
# end try
# end for
# end func

comm = MPI.COMM_WORLD
nproc = comm.Get_size()
rank = comm.Get_rank()

# Read inventory
inv = None
try:
inv = read_inventory(inventory)
except Exception as e:
print(e)
# end try

files = np.array(glob.glob(input_folder + '/*.mseed'))
random.Random(nproc).shuffle(files)
# files = files[:100]

ustations = set()
files = None
my_files = None
ustationInv = defaultdict(list)
networklist = []
stationlist = []
for file in files:
_, _, net, sta, _ = file.split('.')
ustations.add('%s.%s' % (net, sta))
networklist.append(net)
stationlist.append(sta)
# end for
if(rank == 0):
# Read inventory
inv = None
try:
inv = read_inventory(inventory)
except Exception as e:
print(e)
# end try

networklist = np.array(networklist)
stationlist = np.array(stationlist)
# generate a list of files
paths = [i for i in os.listdir(input_folder) if os.path.isfile(os.path.join(input_folder, i))]
expr = re.compile(fnmatch.translate('*.mseed'), re.IGNORECASE)
files = [os.path.join(input_folder, j) for j in paths if re.match(expr, j)]

files = np.array(files)
random.Random(nproc).shuffle(files)
# files = files[:100]

ustations = set()
networklist = []
stationlist = []
for file in files:
_, _, net, sta, _ = file.split('.')
#tokens = os.path.basename(file).split('.')
#net, sta = tokens[0], tokens[1]

ustations.add('%s.%s' % (net, sta))
networklist.append(net)
stationlist.append(sta)
# end for

idx = np.lexsort((networklist, stationlist))
files = files[idx]
networklist = np.array(networklist)
stationlist = np.array(stationlist)

myfiles = split_list(files, nproc)[rank]
idx = np.lexsort((networklist, stationlist))
files = files[idx]
my_files = split_list(files, nproc)

if (rank == 0):
# station inventories
for i, ustation in enumerate(ustations):
net, sta = ustation.split('.')
sinv = inv.select(network=net, station=sta)
Expand All @@ -117,11 +149,13 @@ def process(input_folder, inventory, output_file_name, min_length_sec, merge_thr
# end if
# end for
# end if

myfiles = comm.scatter(my_files, root=0)
ustationInv = comm.bcast(ustationInv, root=0)

# Extract trace-count-lists in parallel
mytrccountlist = np.zeros(len(myfiles))
for ifile, file in enumerate(tqdm(myfiles)):
for ifile, file in enumerate(tqdm(myfiles, desc='Rank: {}'.format(rank))):
try:
st = read(file, headonly=True)
mytrccountlist[ifile] = len(st)
Expand Down Expand Up @@ -153,7 +187,9 @@ def process(input_folder, inventory, output_file_name, min_length_sec, merge_thr
if (os.path.exists(output_file_name)): os.remove(output_file_name)
ds = pyasdf.ASDFDataSet(output_file_name, compression='gzip-3', mpi=False)

for ifile, file in enumerate(tqdm(files)):
ostream = Stream()
netsta_set = set()
for ifile, file in enumerate(tqdm(files, desc='Rank: {}'.format(rank))):
st = []
if (trccountlist[ifile] > ntraces_per_file):
continue
Expand Down Expand Up @@ -184,36 +220,32 @@ def process(input_folder, inventory, output_file_name, min_length_sec, merge_thr
# end if
# end if

for tr in st:
if(len(ostream) < BUFFER_LENGTH):
for tr in st:

if (tr.stats.npts == 0): continue
if (min_length_sec):
if (tr.stats.npts * tr.stats.delta < min_length_sec): continue
# end if

asdfTag = make_ASDF_tag(tr, "raw_recording").encode('ascii')
if (tr.stats.npts == 0): continue
if (min_length_sec):
if (tr.stats.npts * tr.stats.delta < min_length_sec): continue
# end if

try:
ds.add_waveforms(tr, tag='raw_recording')
except Exception as e:
print(e)
print('Failed to append trace:')
print(tr)
# end try
# end for
ostream += tr
# end for

try:
ds.add_stationxml(ustationInv[netsta])
except Exception as e:
print(e)
print('Failed to append inventory:')
print((ustationInv[netsta]))
# end try
netsta_set.add(netsta)
else:
_write(ds, ostream, ustationInv, netsta_set)
ostream = Stream()
netsta_set = set()
# end if
# end if
# end for

_write(ds, ostream, ustationInv, netsta_set)

print('Closing asdf file..')
del ds
# end if
# end func

####################################################################################################################
# Example commandline run:
Expand Down

0 comments on commit 06f464d

Please sign in to comment.