Skip to content

Commit

Permalink
Script to Find Gaps in Data Archive (#235)
Browse files Browse the repository at this point in the history
* Added a new script to find and report gaps in data archive
* Resolved a major issue in _FederatedASDFDataSetImpl.py where
  the storage limits allowed in MPI-gather calls were being hit due to
  the recent addition of several year's worth of permanent data
  • Loading branch information
geojunky committed Jan 27, 2022
1 parent 533daba commit 0e1fd29
Show file tree
Hide file tree
Showing 2 changed files with 159 additions and 11 deletions.
32 changes: 21 additions & 11 deletions seismic/ASDFdatabase/_FederatedASDFDataSetImpl.py
Original file line number Diff line number Diff line change
Expand Up @@ -357,11 +357,13 @@ def decode_tag(tag, type='raw_recording'):
'(?, ?, ?, ?, ?)', metadatalist)
self.conn.execute('insert into masterinv(inv) values(?)',
[cPickle.dumps(masterinv, cPickle.HIGHEST_PROTOCOL)])
self.conn.commit()
self.conn.close()
# end if

tagsCount = 0
for ids, ds in enumerate(self.asdf_datasets):
print(('Creating index for %s..' % (self.asdf_file_names[ids])))
if(self.rank==0): print(('Indexing %s..' % (os.path.basename(self.asdf_file_names[ids]))))

keys = list(ds.get_all_coordinates().keys())
keys = split_list(keys, self.nproc)
Expand All @@ -381,24 +383,32 @@ def decode_tag(tag, type='raw_recording'):
# end for
# end for

data = self.comm.gather(data, root=0)
if(self.rank==0):
data = [item for sublist in data for item in sublist]
self.conn.executemany('insert into wdb(ds_id, net, sta, loc, cha, st, et, tag) values '
'(?, ?, ?, ?, ?, ?, ?, ?)', data)
print(('Inserted %d entries on rank %d'%(len(data), self.rank)))
tagsCount += len(data)
for irank in np.arange(self.nproc):
if(irank == self.rank):
if(len(data)):
self.conn = sqlite3.connect(self.db_fn)
self.conn.executemany('insert into wdb(ds_id, net, sta, loc, cha, st, et, tag) values '
'(?, ?, ?, ?, ?, ?, ?, ?)', data)
print(('\tInserted %d entries on rank %d'%(len(data),
self.rank)))
tagsCount += len(data)
self.conn.commit()
self.conn.close()
# end if
# end if

self.comm.Barrier()
# end for
# end for

if(self.rank==0):
print('Creating table indices..')
self.conn = sqlite3.connect(self.db_fn)
self.conn.execute('create index allindex on wdb(net, sta, loc, cha, st, et)')
self.conn.execute('create index netstaindex on netsta(ds_id, net, sta)')
self.conn.commit()
print(('Created database on rank %d for %d waveforms (%5.2f MB)' % \
(self.rank, tagsCount, round(psutil.Process().memory_info().rss / 1024. / 1024., 2))))

self.conn.close()
print('Done..')
# end if
self.comm.Barrier()
self.conn = sqlite3.connect(self.db_fn)
Expand Down
138 changes: 138 additions & 0 deletions seismic/ASDFdatabase/find_data_gaps.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
#!/bin/env python
"""
Description:
Find data-gaps in ASDF archives, by network.
References:
CreationDate: 10/01/22
Developer: rakib.hassan@ga.gov.au
Revision History:
LastUpdate: 10/01/22 RH
LastUpdate: dd/mm/yyyy Who Optional description
"""

import os, sys

from collections import defaultdict
import numpy as np
from obspy import Stream, Trace, UTCDateTime
from seismic.ASDFdatabase.FederatedASDFDataSet import FederatedASDFDataSet
from tqdm import tqdm
import click

def dump_gaps(asdf_source, network, start_date, end_date, min_gap_length, output_filename):
ds = FederatedASDFDataSet(asdf_source)
conn = ds.fds.conn

query = 'select st, et, net, sta, loc, cha from wdb where net="{}"'.format(network)
if(start_date and end_date):
query += ' and st>={} and et<={}'.format(start_date, end_date)
# end if
query += ' order by st, et'

rows = conn.execute(query).fetchall()
rows = np.array(rows, dtype=[('st', 'float'), ('et', 'float'),
('net', 'object'),
('sta', 'object'),
('loc', 'object'),
('cha', 'object')])


tree = lambda: defaultdict(tree)
nested_dict = tree()
for i in tqdm(np.arange(rows.shape[0])):
net = rows['net'][i]
sta = rows['sta'][i]
loc = rows['loc'][i]
cha = rows['cha'][i]
st = rows['st'][i]
et = rows['et'][i]

if (type(nested_dict[net][sta][loc][cha]) == defaultdict):
nested_dict[net][sta][loc][cha] = []
# end if

nested_dict[net][sta][loc][cha].append([st, et])
# end for

fh = open(output_filename, 'w+')
for net in nested_dict.keys():
for sta in nested_dict[net].keys():
for loc in nested_dict[net][sta].keys():
for cha in nested_dict[net][sta][loc].keys():
arr = nested_dict[net][sta][loc][cha]
if (len(arr)):
arr = np.array(arr)

st = arr[:, 0]
et = arr[:, 1]
assert np.allclose(np.array(sorted(st)), st), 'Start-times array not sorted!'
gaps = np.argwhere((st[1:] - et[:-1]) >= min_gap_length)

if (len(gaps)):
for i, idx in enumerate(gaps):
idx = idx[0]

fh.write('{} {} {} {} {} {}\n'.format(net, sta,
loc if len(loc) else '--',
cha, UTCDateTime(et[idx]),
UTCDateTime(st[idx + 1])))
# end for
# end if
# end if
# end for
# break
# end for
# end for
# end for
fh.close()
# end func


CONTEXT_SETTINGS = dict(help_option_names=['-h', '--help'])
@click.command(context_settings=CONTEXT_SETTINGS)
@click.argument('asdf-source', required=True,
type=click.Path(exists=True))
@click.argument('network-name', required=True,
type=str)
@click.argument('output-filename', required=True,
type=click.Path(dir_okay=False))
@click.option('--start-date', default=None,
help="Start date-time in UTC format. If specified, 'end-date' must also be specified; "
"otherwise this parameter is ignored.",
type=str)
@click.option('--end-date', default=None,
help="End date-time in UTC format. If specified, 'start-date' must also be specified; "
"otherwise this parameter is ignored.",
type=str)
@click.option('--min-length', default=86400, type=float,
help="Minimum length of gaps in seconds to report")
def process(asdf_source, network_name, output_filename, start_date, end_date, min_length):
"""
ASDF_SOURCE: Path to text file containing paths to ASDF files\n
NETWORK_NAME: Network name \n
OUTPUT_FILENAME: Output file-name\n
Example usage:
"""

try:
start_date = UTCDateTime(start_date).timestamp if start_date else None
end_date = UTCDateTime(end_date).timestamp if end_date else None
length = int(min_length)
except Exception as e:
print(str(e))
assert 0, 'Invalid input'
# end try

assert min_length > 0, '--min-length must be > 0'

dump_gaps(asdf_source, network_name, start_date, end_date, min_length, output_filename)
# end func

if (__name__ == '__main__'):
process()
# end if

0 comments on commit 0e1fd29

Please sign in to comment.