Skip to content

Commit

Permalink
Miscellaneous Changes
Browse files Browse the repository at this point in the history
* Added an option for multi-threaded access to FederatedASDFDataSet
* Minor changes to parallelization in pick_eqt
  • Loading branch information
geojunky committed Aug 7, 2023
1 parent b4db961 commit 8652cb3
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 36 deletions.
12 changes: 9 additions & 3 deletions seismic/ASDFdatabase/FederatedASDFDataSet.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,19 @@
from seismic.misc import rtp2xyz

class FederatedASDFDataSet():
def __init__(self, asdf_source, logger=None, single_item_read_limit_in_mb=1024):
def __init__(self, asdf_source, logger=None,
single_item_read_limit_in_mb=1024,
single_threaded_access=True):
"""
Initializer for FederatedASDFDataSet.
:param asdf_source: Path to a text file containing a list of ASDF files. \
Entries can be commented out with '#'
:param logger: logger instance
:param single_item_read_limit_in_mb: buffer size for Obspy reads
:param single_threaded_access: By default, data are read via unthreaded MPI-processes.
This can be relaxed for threaded GUI applications, though data access will still
remain single-threaded.
"""
self.logger = logger
self.asdf_source = asdf_source
Expand All @@ -37,7 +43,8 @@ def __init__(self, asdf_source, logger=None, single_item_read_limit_in_mb=1024):

# Instantiate implementation class
self.fds = _FederatedASDFDataSetImpl(asdf_source, logger=logger,
single_item_read_limit_in_mb=single_item_read_limit_in_mb)
single_item_read_limit_in_mb=single_item_read_limit_in_mb,
single_threaded_access=single_threaded_access)

# Populate coordinates
self._unique_coordinates = defaultdict(list)
Expand All @@ -62,7 +69,6 @@ def __init__(self, asdf_source, logger=None, single_item_read_limit_in_mb=1024):

self._tree = cKDTree(xyzs)
self._key_list = np.array(list(rtps_dict.keys()))

# end func

@property
Expand Down
28 changes: 20 additions & 8 deletions seismic/ASDFdatabase/_FederatedASDFDataSetImpl.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,18 +73,25 @@ def split_list_by_timespan(l, n):
# end func

class _FederatedASDFDataSetImpl():
def __init__(self, asdf_source, logger=None, single_item_read_limit_in_mb=1024):
def __init__(self, asdf_source, logger=None,
single_item_read_limit_in_mb=1024,
single_threaded_access=True):
"""
:param asdf_source: path to a text file containing a list of ASDF files:
Entries can be commented out with '#'
:param logger: logger instance
:param single_item_read_limit_in_mb: buffer size for Obspy reads
:param single_threaded_access: By default, data are read via unthreaded MPI-processes.
This can be relaxed for threaded GUI applications, though data access will still
remain single-threaded.
"""

self.comm = MPI.COMM_WORLD
self.nproc = self.comm.Get_size()
self.rank = self.comm.Get_rank()

self.logger = logger
self.single_threaded_access = single_threaded_access
self.asdf_source = None
self.asdf_file_names = []
self.asdf_station_coordinates = []
Expand Down Expand Up @@ -320,10 +327,12 @@ def decode_tag(tag, type='raw_recording'):

if(dbFound):
print(('Found database: %s'%(self.db_fn)))
self.conn = sqlite3.connect(self.db_fn)
self.conn = sqlite3.connect(self.db_fn,
check_same_thread=self.single_threaded_access)
else:
if(self.rank==0):
self.conn = sqlite3.connect(self.db_fn)
self.conn = sqlite3.connect(self.db_fn,
check_same_thread=self.single_threaded_access)
self.conn.execute('create table wdb(ds_id smallint, net varchar(6), sta varchar(6), loc varchar(6), '
'cha varchar(6), st double, et double, tag text)')
self.conn.execute('create table netsta(ds_id smallint, net varchar(6), sta varchar(6), lon double, '
Expand Down Expand Up @@ -388,7 +397,8 @@ def decode_tag(tag, type='raw_recording'):
for irank in np.arange(self.nproc):
if(irank == self.rank):
if(len(data)):
self.conn = sqlite3.connect(self.db_fn)
self.conn = sqlite3.connect(self.db_fn,
check_same_thread=self.single_threaded_access)
self.conn.executemany('insert into wdb(ds_id, net, sta, loc, cha, st, et, tag) values '
'(?, ?, ?, ?, ?, ?, ?, ?)', data)
print(('\tInserted %d entries on rank %d'%(len(data),
Expand All @@ -405,15 +415,17 @@ def decode_tag(tag, type='raw_recording'):

if(self.rank==0):
print('Creating table indices..')
self.conn = sqlite3.connect(self.db_fn)
self.conn = sqlite3.connect(self.db_fn,
check_same_thread=self.single_threaded_access)
self.conn.execute('create index allindex on wdb(net, sta, loc, cha, st, et)')
self.conn.execute('create index netstaindex on netsta(ds_id, net, sta)')
self.conn.commit()
self.conn.close()
print('Done..')
# end if
self.comm.Barrier()
self.conn = sqlite3.connect(self.db_fn)
self.conn = sqlite3.connect(self.db_fn,
check_same_thread=self.single_threaded_access)
# end if

# Load metadata
Expand All @@ -431,9 +443,9 @@ def decode_tag(tag, type='raw_recording'):
def get_global_time_range(self, network, station, location=None, channel=None):
query = "select min(st), max(et) from wdb where net='%s' and sta='%s' "%(network, station)

if (location):
if (location is not None):
query += "and loc='%s' "%(location)
if (channel):
if (channel is not None):
query += "and cha='%s' "%(channel)

row = self.conn.execute(query).fetchall()[0]
Expand Down
39 changes: 14 additions & 25 deletions seismic/pick_harvester/local/pick_eqt.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,6 @@ def setup_logger(name, log_file, level=logging.INFO):
return logger
# end func

def dropBogusTraces(st, sampling_rate_cutoff=5):
badTraces = [tr for tr in st if tr.stats.sampling_rate < sampling_rate_cutoff]

for tr in badTraces: st.remove(tr)
# end func

def getWorkLoad(fds:FederatedASDFDataSet, netsta_list:str,
start_time:UTCDateTime, end_time:UTCDateTime):
"""
Expand Down Expand Up @@ -90,16 +84,18 @@ def getWorkLoad(fds:FederatedASDFDataSet, netsta_list:str,
netsta_list.append(netsta)
# end for

if(len(netsta_list) == 0): print('Warning: no stations found to process..')

result_list = []
for netsta in netsta_list:
st = start_time
et = end_time
st = UTCDateTime(start_time)
et = UTCDateTime(end_time)

nc, sc = netsta.split('.')
minSt, maxEt = fds.get_global_time_range(nc, sc)
gSt, gEt = fds.get_global_time_range(nc, sc)

if(st < minSt): st = minSt
if(et > maxEt): et = maxEt
if(st < gSt): st = gSt
if(et > gEt): et = gEt

cTime = st
while(cTime < et):
Expand Down Expand Up @@ -418,19 +414,16 @@ def outputConfigParameters():
proc_workload = comm.bcast(proc_workload, root=0)

# ==================================================
# Define output header and open output files
# depending on the mode of operation (fresh/restart)
# Open output files depending on the mode of
# operation (fresh/restart)
# ==================================================
header = '#net, sta, loc, cha, timestamp, probability \n'
ofnp = os.path.join(output_path, 'p_arrivals.%d.txt' % (rank))
ofns = os.path.join(output_path, 's_arrivals.%d.txt' % (rank))
ofhp = None
ofhs = None
if (restart == False):
ofhp = open(ofnp, 'w+')
ofhs = open(ofns, 'w+')
ofhp.write(header)
ofhs.write(header)
else:
ofhp = open(ofnp, 'a+')
ofhs = open(ofns, 'a+')
Expand All @@ -445,11 +438,9 @@ def outputConfigParameters():
nc, sc = netsta.split('.')
loc_pref_dict = defaultdict(lambda: None) # ignoring location codes

print('Rank: {}, Processing: {} - {}'.format(rank, st, et))
if (progTracker.increment()):
pass
else:
print('Moving along')
continue
# end if

Expand Down Expand Up @@ -490,27 +481,25 @@ def outputConfigParameters():

# Merge results on proc 0
if (rank == 0):
merge_results(output_path)
header = '#net, sta, loc, cha, timestamp, probability \n'
merge_results(output_path, header)
# end if
# end func

def merge_results(output_path):
def merge_results(output_path, header):
search_strings = ['p_arrivals*', 's_arrivals*']
output_fns = ['p_combined.txt', 's_combined.txt']

for ss, ofn in zip(search_strings, output_fns):
files = recursive_glob(output_path, ss)
ofn = open('%s/%s' % (output_path, ofn), 'w+')
ofn.write(header)

data = set()
for i, fn in enumerate(files):
lines = open(fn, 'r').readlines()

if (i == 0):
ofn.write(lines[0])
# end if

for j in range(1, len(lines)):
for j in np.arange(len(lines)):
data.add(lines[j])
# end for

Expand Down

0 comments on commit 8652cb3

Please sign in to comment.