Skip to content

Commit

Permalink
Merge 2cb994e into f69325c
Browse files Browse the repository at this point in the history
  • Loading branch information
medlin01GA committed Jun 28, 2019
2 parents f69325c + 2cb994e commit baf6699
Show file tree
Hide file tree
Showing 9 changed files with 662 additions and 128 deletions.
6 changes: 3 additions & 3 deletions seismic/ASDFdatabase/FederatedASDFDataSet.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@

HAS_RTREE = True
try:
from FederatedASDFDataSetMemVariant import FederatedASDFDataSetMemVariant
except:
from seismic.ASDFdatabase.FederatedASDFDataSetMemVariant import FederatedASDFDataSetMemVariant
except ImportError:
HAS_RTREE = False
# end if

Expand Down Expand Up @@ -184,7 +184,7 @@ def get_waveforms(self, network, station, location, channel, starttime,
:param starttime: start time string in UTCDateTime format; can also be an instance of Obspy UTCDateTime
:param endtime: end time string in UTCDateTime format; can also be an instance of Obspy UTCDateTime
:param automerge: merge traces (default False)
:param trace_count_threshold: returns an empty Stream if the number of traces within the time-rage provided
:param trace_count_threshold: returns an empty Stream if the number of traces within the time-range provided
exceeds the threshold (default 200). This is particularly useful for filtering
out data from bad stations, e.g. those from the AU.Schools network
:return: an Obspy Stream containing waveform data over the time-rage provided
Expand Down
29 changes: 21 additions & 8 deletions seismic/ASDFdatabase/FederatedASDFDataSetDBVariant.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,11 @@ def decode_tag(tag, type='raw_recording'):
endttime = UTCDateTime(tokens[2]).timestamp

return nc, sc, lc, cc, starttime, endttime
except:
except Exception:
if self.logger:
self.logger.error("Failed to decode tag {}".format(tag))
return None
# end func
# end try
# end func

dbFound = os.path.exists(self.db_fn)
Expand Down Expand Up @@ -315,18 +317,27 @@ def get_waveforms(self, network, station, location, channel, starttime,
for row in rows:
ds_id, net, sta, loc, cha, st, et, tag = row
station_data = self.asdf_datasets[ds_id].waveforms['%s.%s'%(net, sta)]
utc_stime = utc_etime = '<unknown>'
try:
s += station_data[tag]
except:
pass
utc_stime = UTCDateTime(starttime)
utc_etime = UTCDateTime(endtime)
data_segment = station_data.get_item(tag, utc_stime, utc_etime)
s += data_segment
except Exception as e:
if self.logger:
self.logger.error("Failed to get data segment {} -- {} for {}.{} with error:\n{}"
.format(utc_stime, utc_etime, net, sta, str(e)))
# end if
# end try
# end for

if(automerge):
try:
s.merge(method=-1)
except:
pass
except Exception as e:
if self.logger:
self.logger.error("Stream merge error:\n{}".format(str(e)))
# end if
# end try
# end if

Expand Down Expand Up @@ -381,7 +392,9 @@ def local_net_sta_list(self):
try:
start_time = UTCDateTime(workload[nk][sk][0])
end_time = UTCDateTime(workload[nk][sk][1])
except:
except Exception:
if self.logger:
self.logger.warning("Failed to convert start and end times for keys {}, {}".format(nk, sk))
continue
# end try

Expand Down
62 changes: 33 additions & 29 deletions seismic/ASDFdatabase/FederatedASDFDataSetMemVariant.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
LastUpdate: dd/mm/yyyy Who Optional description
"""

from future.utils import iteritems

from mpi4py import MPI
import os
import glob
Expand Down Expand Up @@ -73,7 +75,7 @@ class FederatedASDFDataSetMemVariant():
def hasOverlap(stime1, etime1, stime2, etime2):
result = 0

#print stime1, etime1, stime2, etime2
#print(stime1, etime1, stime2, etime2)

if (etime1 is None): etime1 = UTCDateTime.now().timestamp
if (etime2 is None): etime2 = UTCDateTime.now().timestamp
Expand Down Expand Up @@ -117,7 +119,7 @@ def __init__(self, asdf_source, use_json_db=False, logger=None):
if(type(asdf_source)==str):
self.asdf_source = asdf_source

fileContents = filter(len, open(self.asdf_source).read().splitlines())
fileContents = list(filter(len, open(self.asdf_source).read().splitlines()))

# collate file names
for i in range(len(fileContents)):
Expand Down Expand Up @@ -177,15 +179,15 @@ def __init__(self, asdf_source, use_json_db=False, logger=None):
# end func

def extract_time_ranges(self):
print 'Extracting time ranges on rank %d'%(self.rank)
print('Extracting time ranges on rank %d'%(self.rank))
for it, val in enumerate(self.tree_list):
if(val):
self.net_sta_start_time[it] = tree()
self.net_sta_end_time[it] = tree()
min = []
max = []
for (nk, nv) in val.iteritems():
for (sk, sv) in nv.iteritems():
for (nk, nv) in iteritems(val):
for (sk, sv) in iteritems(nv):

if (type(self.net_sta_start_time[it][nk][sk]) == defaultdict):
self.net_sta_start_time[it][nk][sk] = 1e32
Expand All @@ -194,8 +196,8 @@ def extract_time_ranges(self):
self.net_sta_end_time[it][nk][sk] = -1e32
# end if

for (lk, lv) in sv.iteritems():
for (ck, cv) in lv.iteritems():
for (lk, lv) in iteritems(sv):
for (ck, cv) in iteritems(lv):
if (type(cv) == index.Index):
if (len(cv.leaves()[0][1])==0): continue

Expand All @@ -222,13 +224,13 @@ def extract_time_ranges(self):
def create_index(self):
for ifn, fn in enumerate(self.json_db_file_names):
if(fn):
print 'Creating index for %s..'%(fn)
print('Creating index for %s..'%(fn))
d = json.load(open(fn, 'r'))

t = tree()
fieldMap = None
indices = []
for ki, (k, v) in enumerate(d.iteritems()):
for ki, (k, v) in enumerate(iteritems(d)):

if (fieldMap == None):
fieldMap = defaultdict()
Expand All @@ -243,7 +245,7 @@ def create_index(self):
fieldMap['tr_endtime']]
# end if

values = v.values()
values = list(v.values())
network = values[indices[0]]
station = values[indices[1]]
channel = values[indices[2]]
Expand All @@ -262,7 +264,7 @@ def create_index(self):
self.asdf_tags_list[ifn] = d.keys()

del d
print 'Done creating index'
print('Done creating index')
# end if
# end for
self.extract_time_ranges()
Expand All @@ -288,16 +290,16 @@ def decode_tag(tag, type='raw_recording'):

tagsCount = 0
for ids, ds in enumerate(self.asdf_datasets):
print 'Creating index for %s..' % (self.asdf_file_names[ids])
print('Creating index for %s..' % (self.asdf_file_names[ids]))

keys = ds.get_all_coordinates().keys()
keys = list(ds.get_all_coordinates().keys())
keys = split_list(keys, self.nproc)

data = []
#print 'Found %d keys'%(len(keys))
#print('Found %d keys'%(len(keys)))
for ikey, key in enumerate(keys[self.rank]):
sta = ds.waveforms[key]
#print 'Loading key number %d: %s'%(ikey, key)
#print('Loading key number %d: %s'%(ikey, key))
for tag in sta.list():

result = decode_tag(tag)
Expand Down Expand Up @@ -325,7 +327,9 @@ def decode_tag(tag, type='raw_recording'):
t[network][station][location][channel] = index.Index()
# end if

assert isinstance(t[network][station][location][channel], index.Index)
t[network][station][location][channel].insert(ki, (tr_st, 1, tr_et, 1))
assert isinstance(t[network][station][location][channel], index.Index)

tags_list.append(tag)
ki += 1
Expand All @@ -336,7 +340,7 @@ def decode_tag(tag, type='raw_recording'):

tagsCount += len(data)
# end for
print 'Created sparse indices on rank %d for %d waveforms' % (self.rank, tagsCount)
print('Created sparse indices on rank %d for %d waveforms' % (self.rank, tagsCount))

self.extract_time_ranges()
# end func
Expand All @@ -347,22 +351,22 @@ def get_global_time_range(self, network, station, location=None, channel=None):

for val in self.tree_list:
if(val):
for (nk, nv) in val.iteritems():
for (nk, nv) in iteritems(val):
if(network):
nk = network
nv = val[nk]
# end if
for (sk, sv) in nv.iteritems():
for (sk, sv) in iteritems(nv):
if (station):
sk = station
sv = nv[sk]
# end if
for (lk, lv) in sv.iteritems():
for (lk, lv) in iteritems(sv):
if (location):
lk = location
lv = sv[lk]
# end if
for (ck, cv) in lv.iteritems():
for (ck, cv) in iteritems(lv):
if (channel):
ck = channel
cv = lv[ck]
Expand Down Expand Up @@ -402,28 +406,28 @@ def get_stations(self, starttime, endtime, network=None, station=None, location=

results = []
for i in dslistIndices:
#print 'Accessing file: %s'%(self.asdf_file_names[i])
#print('Accessing file: %s'%(self.asdf_file_names[i]))

ds = self.asdf_datasets[i]

val = self.tree_list[i]
if(val): # has json db
for (nk, nv) in val.iteritems():
for (nk, nv) in iteritems(val):
if(network):
nk = network
nv = val[nk]
# end if
for (sk, sv) in nv.iteritems():
for (sk, sv) in iteritems(nv):
if (station):
sk = station
sv = nv[sk]
# end if
for (lk, lv) in sv.iteritems():
for (lk, lv) in iteritems(sv):
if (location):
lk = location
lv = sv[lk]
# end if
for (ck, cv) in lv.iteritems():
for (ck, cv) in iteritems(lv):
if (channel):
ck = channel
cv = lv[ck]
Expand Down Expand Up @@ -477,7 +481,7 @@ def get_waveforms(self, network, station, location, channel, starttime,

s = Stream()
for i in dslistIndices:
#print 'Accessing file: %s'%(self.asdf_file_names[i])
#print('Accessing file: %s'%(self.asdf_file_names[i]))

ds = self.asdf_datasets[i]

Expand Down Expand Up @@ -529,8 +533,8 @@ def get_waveforms(self, network, station, location, channel, starttime,
def local_net_sta_list(self):
for it, val in enumerate(self.tree_list):
if(val):
for (nk, nv) in val.iteritems():
for (sk, sv) in nv.iteritems():
for (nk, nv) in iteritems(val):
for (sk, sv) in iteritems(nv):
start_time = None
end_time = None
try:
Expand Down Expand Up @@ -563,4 +567,4 @@ def cleanup(self):
#s = fds.get_waveforms('AU', 'QIS', '*', 'BHZ',
# '2010-06-01T00:00:00', '2010-06-01T00:06:00',
# 'raw_recording', automerge=False)
#print s
#print(s)
4 changes: 2 additions & 2 deletions seismic/pick_harvester/createEnsembleXML.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ def _load_events_helper(self):
try:
utctime = UTCDateTime(year, month, day,
hour, minute, second)
except:
except Exception:
continue
# end try

Expand Down Expand Up @@ -306,7 +306,7 @@ def _load_events_helper(self):
try:
utctime = UTCDateTime(year, month, day,
hour, minute, second)
except:
except Exception:
continue
# end try

Expand Down

0 comments on commit baf6699

Please sign in to comment.