Skip to content

Commit

Permalink
Fixing merge conflict
Browse files Browse the repository at this point in the history
  • Loading branch information
geojunky committed Feb 10, 2020
2 parents c5f91f5 + 099e303 commit 94373ce
Showing 1 changed file with 49 additions and 56 deletions.
105 changes: 49 additions & 56 deletions seismic/ASDFdatabase/asdf2mseed.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@
from obspy.core.trace import Trace
import click

def split_list(lst, npartitions):
k, m = divmod(len(lst), npartitions)
return [lst[i * k + min(i, m):(i + 1) * k + min(i + 1, m)] for i in range(npartitions)]
# end func

def dump_traces(ds, sn_list, start_date, end_date, length, output_folder):
"""
Dump mseed traces from an ASDF file in parallel
Expand All @@ -39,44 +44,42 @@ def dump_traces(ds, sn_list, start_date, end_date, length, output_folder):
for sn in sn_list:
logf = open(os.path.join(output_folder, '%s.log.txt'%(sn)), "w+")

if(not os.path.exists(os.path.join(output_folder, sn))):
os.system('mkdir %s'%os.path.join(output_folder, sn))

trCounts = np.zeros(3)
trCounts = 0
if(start_date and end_date):
# dump traces within given time-range and of given length each
current_time = start_date

logf.write('Exporting mseed files for station: %s\n'%(sn))
while(current_time < end_date):
zst = ds.ds_jason_db.fetchDataByTime(ds.ds, sn, ds.zchannel,
current_time,
current_time+length)

nst = ds.ds_jason_db.fetchDataByTime(ds.ds, sn, ds.nchannel,
current_time,
current_time+length)
if(end_date - current_time < length):
length = end_date - current_time

est = ds.ds_jason_db.fetchDataByTime(ds.ds, sn, ds.echannel,
current_time,
current_time+length)
net, sta = sn.split('.')

for i, st in enumerate([zst, nst, est]):
if(st is None): continue
st = None
try:
st = ds.get_waveforms(net, sta, '*', '*', current_time, current_time+length, tag='raw_recording')
except:
logf.write('Failed to read stream: %s\n' % (fsName))
continue
# end try

fsName = '%s.%s.MSEED'%(st.traces[0].id, UTCDateTime(current_time).strftime("%y-%m-%d.T%H:%M:%S"))
if(len(st)==0):
current_time += length
continue
# end if

try:
st.merge(method=1, fill_value=0)
st.write(os.path.join(os.path.join(OUTPUT_PATH, sn), fsName),
format="MSEED")
trCounts[i] += 1
fsName = '%s.%s-%s.MSEED'%(sn, current_time.timestamp, (current_time+length).timestamp)

try:
st.write(os.path.join(output_folder, fsName), format="MSEED")
trCounts += len(st)
except:
logf.write('Failed to write stream: %s\n'%(fsName))
logf.flush()
# end try

#st.plot()
except:
logf.write('Failed to write stream: %s\n'%(fsName))
logf.flush()
# end for
#break
current_time += length
# wend
Expand All @@ -96,23 +99,20 @@ def dump_traces(ds, sn_list, start_date, end_date, length, output_folder):
try:
t.write(os.path.join(os.path.join(output_folder, sn), fsName),
format="MSEED")
trCounts[0] += 1
trCounts += len(s)
logf.write('Wrote waveform with tag: %s\n' % (tag))
except Exception as e:
logf.write('Failed to write trace: %s\n'%(fsName))
logf.flush()
# end for
# end for
# end if
# end for

if(start_date and end_date):
logf.write('\t Exported (%d)Z, (%d)N and (%d)E traces.\n' % (trCounts[0], trCounts[1], trCounts[2]))
else:
logf.write('\t Exported %d traces.\n' % (trCounts[0]))
# end if
logf.flush()
logf.close()
logf.write('\t Exported (%d) traces.\n' % (trCounts))

logf.flush()
logf.close()
# end for
# end func


Expand All @@ -130,9 +130,9 @@ def dump_traces(ds, sn_list, start_date, end_date, length, output_folder):
help="End date-time in UTC format. If specified, both 'start-date' and 'length' must be specified; "
"otherwise this parameter is ignored.",
type=str)
@click.option('--length', default=3600,
@click.option('--length', default=86400,
help="Length of each trace in seconds. If specified, both 'start-date' and 'end-date' must be specified; "
"otherwise this parameter is ignored.")
"otherwise this parameter is ignored; default is 86400 s (1 day)")
def process(input_asdf, output_folder, start_date, end_date, length):
"""
INPUT_ASDF: Path to input ASDF file\n
Expand All @@ -145,34 +145,26 @@ def process(input_asdf, output_folder, start_date, end_date, length):
"""

try:
start_date = UTCDateTime(start_date).timestamp if start_date else None
end_date = UTCDateTime(end_date).timestamp if end_date else None
length = int(length)
start_date = UTCDateTime(start_date) if start_date else None
end_date = UTCDateTime(end_date) if end_date else None
length = int(length)
except:
assert 0, 'Invalid input'
# end try

comm = MPI.COMM_WORLD
nproc = comm.Get_size()
rank = comm.Get_rank()
proc_stations = defaultdict(list)
ds = pyasdf.ASDFDataSet(input_asdf)
proc_stations = None
ds = pyasdf.ASDFDataSet(input_asdf, mode='r')

if(rank == 0):
# split work over stations in ds1 for the time being
# split work over stations

stations = ds.get_all_coordinates().keys()
stations = list(ds.get_all_coordinates().keys())
meta = ds.get_all_coordinates()
count = 0
for iproc in np.arange(nproc):
for istation in np.arange(np.divide(len(stations), nproc)):
proc_stations[iproc].append(stations[count])
count += 1
# end for
for iproc in np.arange(np.mod(len(stations), nproc)):
proc_stations[iproc].append(stations[count])
count += 1
# end for

proc_stations = split_list(stations, nproc)

# output station meta-data
fn = os.path.join(output_folder, 'stations.txt')
Expand All @@ -187,9 +179,10 @@ def process(input_asdf, output_folder, start_date, end_date, length):
# broadcast workload to all procs
proc_stations = comm.bcast(proc_stations, root=0)

print proc_stations
print (proc_stations[rank])
dump_traces(ds, proc_stations[rank], start_date, end_date, length, output_folder)
# end if

del ds
# end func

if (__name__ == '__main__'):
Expand Down

0 comments on commit 94373ce

Please sign in to comment.