Skip to content

Commit

Permalink
Minor Changes to FederatedASDFDataSet
Browse files Browse the repository at this point in the history
* Added a function for generating coverage summaries
* Added a Click CLI with options for generating summaries and
  force-reindexing data holdings
  • Loading branch information
geojunky committed Sep 5, 2023
1 parent 0ff0b4f commit e68ade6
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 35 deletions.
76 changes: 58 additions & 18 deletions seismic/ASDFdatabase/FederatedASDFDataSet.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,18 @@
from seismic.ASDFdatabase._FederatedASDFDataSetImpl import _FederatedASDFDataSetImpl
from seismic.misc import rtp2xyz, setup_logger
from obspy.core import UTCDateTime
import click

class FederatedASDFDataSet():
def __init__(self, asdf_source, logger=None,
def __init__(self, asdf_source, force_reindex=False, logger=None,
single_item_read_limit_in_mb=1024,
single_threaded_access=True):
"""
Initializer for FederatedASDFDataSet.
:param asdf_source: Path to a text file containing a list of ASDF files. \
Entries can be commented out with '#'
:param force_reindex: Force reindex even if a preexisting db file is found
:param logger: logger instance
:param single_item_read_limit_in_mb: buffer size for Obspy reads
:param single_threaded_access: By default, data are read via unthreaded MPI-processes.
Expand All @@ -43,7 +45,7 @@ def __init__(self, asdf_source, logger=None,
self._earth_radius = 6371 # km

# Instantiate implementation class
self.fds = _FederatedASDFDataSetImpl(asdf_source, logger=logger,
self.fds = _FederatedASDFDataSetImpl(asdf_source, force_reindex=force_reindex, logger=logger,
single_item_read_limit_in_mb=single_item_read_limit_in_mb,
single_threaded_access=single_threaded_access)

Expand Down Expand Up @@ -257,26 +259,64 @@ def find_gaps(self, network=None, station=None, location=None,
"""
return self.fds.find_gaps(network, station, location, channel, start_date_ts, end_date_ts, min_gap_length)
# end func
# end class

if __name__ == "__main__":
"""
How to Run Example::
def get_coverage(self, network=None):
"""
Generates coverage for the entire data holdings for a selected network.
@param network: network code
@return: Numpy record array with columns: net, sta, loc, cha,
start_timestamp, end_timestamp
"""

python ASDFdatabase/FederatedASDFDataSet.py /Datasets/asdf_file_index.txt
rows = self.fds.get_coverage(network=network)
return rows
# end func
# end class

Upon success, a db file will be created: /Datasets/f374ca9e7dd8abd2a1d58575e0d55520f30ffc23.db
CONTEXT_SETTINGS = dict(help_option_names=['-h', '--help'])
@click.command(context_settings=CONTEXT_SETTINGS)
@click.argument('asdf-source', required=True,
type=click.Path(exists=True))
@click.option('--force-reindex', default=False, is_flag=True,
help='Force reindex, even if a preexisting database is found')
@click.option('--generate-summary', default=False, is_flag=True,
help='Generate coverage and data availability summaries')
def process(asdf_source, force_reindex, generate_summary):
"""
ASDF_SOURCE: Text file containing a list of paths to ASDF files
"""
import sys
from seismic.ASDFdatabase.FederatedASDFDataSet import FederatedASDFDataSet

if len(sys.argv) < 2:
print("******** USAGE: python3 %s %s **********"% (sys.argv[0], "asdf_file_list_txt"))
sys.exit(1)
ofn = 'FederatedASDFDataSet.Indexer.log'
logger = setup_logger('', ofn)
ds = FederatedASDFDataSet(asdf_source, force_reindex=force_reindex, logger=logger)

if(generate_summary):
if(ds.fds.rank == 0):
ts = UTCDateTime().strftime("%Y-%m-%dT%H.%M.%S")
logger.info('Generating coverage summary..')
ofn = 'FederatedASDFDataSet.Summary.{}.txt'.format(ts)

with open(ofn, 'w') as fh:
fh.write('# net, sta, loc, cha, lon, lat, min_starttime, max_endtime, duration_months\n')

rows = ds.get_coverage()
for row in rows:
net, sta, loc, cha, lon, lat, min_st, max_et = row
duration_months = (max_et - min_st) / (86400 * 30)

line = '{},{},{},{},{:3.4f},{:3.4f},{},{},{:5.3f}\n'.\
format(net, sta, loc, cha, lon, lat,
UTCDateTime(min_st).strftime('%Y-%m-%dT%H:%M:%S'),
UTCDateTime(max_et).strftime('%Y-%m-%dT%H:%M:%S'),
duration_months)
fh.write(line)
# end for
# end with
# end if
# end if
logger.info('Done..')
# end func

asdf_file_list = sys.argv[1]
ts = UTCDateTime().strftime("%Y-%m-%d.T%H.%M.%S")
ofn = 'FederatedASDFDataSet.Indexer.{}.log'.format(ts)
logger = setup_logger('', ofn)
ds = FederatedASDFDataSet(asdf_file_list, logger=logger)
if __name__ == "__main__":
process()
# end func
54 changes: 37 additions & 17 deletions seismic/ASDFdatabase/_FederatedASDFDataSetImpl.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,13 @@ def split_list_by_timespan(l, n):
# end func

class _FederatedASDFDataSetImpl():
def __init__(self, asdf_source, logger=None,
def __init__(self, asdf_source, force_reindex=False, logger=None,
single_item_read_limit_in_mb=1024,
single_threaded_access=True):
"""
:param asdf_source: path to a text file containing a list of ASDF files:
Entries can be commented out with '#'
:param force_reindex: Force reindex even if a preexisting db file is found
:param logger: logger instance
:param single_item_read_limit_in_mb: buffer size for Obspy reads
:param single_threaded_access: By default, data are read via unthreaded MPI-processes.
Expand All @@ -83,6 +84,8 @@ def __init__(self, asdf_source, logger=None,
if isinstance(asdf_source, str):
self.asdf_source = asdf_source
self.source_sha1 = hashlib.sha1(open(self.asdf_source).read().encode('utf-8')).hexdigest()
self.db_fn = os.path.join(os.path.dirname(self.asdf_source), self.source_sha1 + '.db')

fileContents = list(filter(len, open(self.asdf_source).read().splitlines()))

# collate file names
Expand Down Expand Up @@ -110,9 +113,18 @@ def __init__(self, asdf_source, logger=None,
# end if
# end func

# Remove preexisting db if force_reindex is True
if (force_reindex):
if(self.rank == 0):
if(os.path.exists(self.db_fn)):
os.remove(self.db_fn)
# end if
# end if
# end if
self.comm.Barrier()

# Create database
self.conn = None
self.db_fn = os.path.join(os.path.dirname(self.asdf_source), self.source_sha1 + '.db')
self.masterinv = None
self.create_database()
self._load_corrections()
Expand Down Expand Up @@ -305,6 +317,7 @@ def decode_tag(tag, type='raw_recording'):
endtime = UTCDateTime(et).timestamp

if((endtime - starttime) == 0):
# Filtering out zero-length traces
return None
else:
return nc, sc, lc, cc, starttime, endtime
Expand Down Expand Up @@ -781,6 +794,28 @@ def find_gaps(self, network=None, station=None, location=None,
return result
# end func

def get_coverage(self, network=None):
query = """
select w.net, w.sta, w.loc, w.cha, n.lon, n.lat, min(w.st), max(w.et)
from wdb as w, netsta as n where w.net=n.net and w.sta=n.sta and
w.net in (select distinct net from netsta) and
w.sta in (select distinct sta from netsta) and
w.loc in (select distinct loc from wdb) and
w.cha in (select distinct cha from wdb)
"""
if(network): query += ' and w.net="{}"'.format(network)
query += " group by w.net, w.sta, w.loc, w.cha; "

rows = self.conn.execute(query).fetchall()
array_dtype = [('net', 'U10'), ('sta', 'U10'),
('loc', 'U10'), ('cha', 'U10'),
('lon', 'float'), ('lat', 'float'),
('min_st', 'float'), ('max_et', 'float')]
rows = np.array(rows, dtype=array_dtype)

return rows
# end func

def cleanup(self):
for i, ds in enumerate(self.asdf_datasets):
# if self.logger:
Expand All @@ -791,18 +826,3 @@ def cleanup(self):
self.conn.close()
# end func
# end class

if __name__=="__main__":
fn = os.path.join('/tmp', 'test.log')
logger = setup_logger('main', fn)

# fds = _FederatedASDFDataSetImpl('/g/data/ha3/rakib/tmp/a.txt', logger)
fds = _FederatedASDFDataSetImpl("/g/data/ha3/Passive/SHARED_DATA/Index/asdf_files.txt", logger)
s = fds.get_waveforms('AU', 'QIS', '', 'BHZ', '2015-06-01T00:00:00', '2015-06-02T00:06:00') # cannot use wildcard *
print(s)
# select * from wdb where net='AU' and sta='QIS' and loc='' and cha='BHZ' and et>=1433116800.000000 and st<=1433203560.000000
# 2 Trace(s) in Stream:
# AU.QIS..BHZ | 2015-05-31T23:59:59.994500Z - 2015-06-01T23:59:59.994500Z | 40.0 Hz, 3456001 samples
# AU.QIS..BHZ | 2015-06-01T23:59:55.769500Z - 2015-06-02T00:05:59.994500Z | 40.0 Hz, 14570 samples

print(s)

0 comments on commit e68ade6

Please sign in to comment.