Skip to content

Commit

Permalink
Data Exporter for Salvus
Browse files Browse the repository at this point in the history
* Added a draft script for exporting data that can be directly ingested
  by Salvus
  • Loading branch information
geojunky committed Feb 14, 2023
1 parent bc37783 commit bc9189e
Showing 1 changed file with 281 additions and 0 deletions.
281 changes: 281 additions & 0 deletions seismic/ASDFdatabase/asdf2salvus.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,281 @@
#!/bin/env python
"""
Description:
Small utility for exporting data from FedASDF for salvus
References:
CreationDate: 07/02/2023
Developer: rakib.hassan@ga.gov.au
Revision History:
LastUpdate: 07/02/23 RH
LastUpdate: dd/mm/yyyy Who Optional description
"""

import os, shutil

from collections import defaultdict
import numpy as np
from obspy import Stream, UTCDateTime, read_inventory, Inventory
import pyasdf
from seismic.ASDFdatabase.FederatedASDFDataSet import FederatedASDFDataSet
import click
from shapely.geometry.polygon import Polygon, Point
from tqdm import tqdm
import json

class Domain():
def __init__(self, domain_json_file:str):
dom = json.load(open(domain_json_file, 'r'))
lat_center, lat_extent, lon_center, lon_extent = \
dom['lat_lon_box']['latitude_center'], \
dom['lat_lon_box']['latitude_extent'], \
dom['lat_lon_box']['longitude_center'], \
dom['lat_lon_box']['longitude_extent']

c = [lon_center, lat_center]
e = [lon_extent, lat_extent]

coords = ((c[0] - e[0], c[1] + e[1]),
(c[0] - e[0], c[1] - e[1]),
((c[0] + e[0]), c[1] - e[1]),
((c[0] + e[0]), c[1] + e[1]))
self.coords = np.array(coords)
self.bounding_polygon = Polygon(coords)
# end func

def contains(self, lon:float, lat:float):
p = Point((lon, lat))

return self.bounding_polygon.contains(p)
# end func
# end class

def get_validated_waveform(fds: FederatedASDFDataSet,
net, sta, loc, cha, st, et):
stream = fds.get_waveforms(net, sta, loc, cha,
st, et)
if stream:
try:
stream = Stream([tr for tr in stream \
if tr.stats.asdf.tag == \
'raw_recording'])
except Exception as e:
pass
# end try
# end if

stream.trim(st, et)
stream.merge()

if (len(stream) > 1):
stream = Stream([])
return stream
# end if

if any(isinstance(tr.data, np.ma.masked_array) for tr in stream):
def has_masked_values(data_stream):
rval = False
for tr in data_stream:
if (isinstance(tr.data, np.ma.masked_array)):
if (np.any(tr.data.mask)):
rval = True
break
# end if
# end if
# end for
return rval

# end func

if (has_masked_values(stream)):
pass
else:
for tr in stream: tr.data = np.array(tr.data)
# end if
# end for

return stream
# end func

DEBUG = True
def extract_data_for_event(fds:FederatedASDFDataSet,
domain:Domain, event:dict,
inventory:Inventory,
output_folder, data_name='raw_data',
receiver_fields='displacement',
seconds_before=240, seconds_after=3600):
"""
@param fds:
@param domain:
@param event:
@param inventory:
@param output_folder:
@param data_name:
@param seconds_before:
@param seconds_after:
@return:
"""

assert len(event) == 1, 'Invalid event dict {}. Must contain a single event. Aborting..'.format(event)

tag = 'raw_recording'
nets = set({'AU'})

# make folders
ename = list(event.keys())[0]
edict = event[ename]
h5_dir = os.path.join(output_folder,
'EVENTS/{}/WAVEFORM_DATA/EXTERNAL/{}'.format(ename, data_name))
os.makedirs(h5_dir, exist_ok=True)

h5_ofn = os.path.join(h5_dir, 'receivers.h5')
receivers_ofn = os.path.join(output_folder, 'EVENTS/{}/receivers.json'.format(ename))

ods = pyasdf.ASDFDataSet(h5_ofn, mode='w')

oinv = None
otime = UTCDateTime(edict[0]['arguments']['reference_time_utc_string'])

st, et = otime - seconds_before, otime + seconds_after
rows = fds.get_stations(st, et)

receivers_dict = defaultdict(dict)
pbar = tqdm(total=len(rows), desc=ename)
for row in rows:
net, sta, loc, cha, lon, lat, elev = row

if (not domain.contains(lon, lat)): continue
if (DEBUG and net not in nets): continue

stream = get_validated_waveform(fds, net, sta, loc,
cha, st, et)

pbar.update()
pbar.set_description(desc='{}: [{}.{}]'.format(ename, net, sta))
if (len(stream)):
# check instrument-response availability
seed_id = '{}.{}.{}.{}'.format(net, sta, loc, cha)

try:
resp = inventory.get_response(seed_id, stream[0].stats.starttime)
except Exception as e:
continue
# end try

if(resp):
ods.add_waveforms(stream, tag)

receivers_dict['{}.{}.{}'.format(net, sta, loc)] = \
{"class_name": "salvus.flow.simple_config.receiver.seismology.SideSetPoint3D",
"salvus_version": "0.12.8",
"arguments": {
"depth_in_m": 0.0,
"radius_of_sphere_in_m": 6371000.0,
"network_code": net,
"location_code": loc,
"side_set_name": "r1",
"latitude": lat,
"longitude": lon,
"station_code": sta,
"fields": [receiver_fields]}}

if(oinv in None): oinv = resp
else: oinv += resp
# end if

#if (DEBUG): break
# end if
# end for
pbar.close()

ods.add_stationxml(oinv)
json.dump(receivers_dict, open(receivers_ofn, 'w+'), indent=4)
del ods
# end func

CONTEXT_SETTINGS = dict(help_option_names=['-h', '--help'])
@click.command(context_settings=CONTEXT_SETTINGS)
@click.argument('asdf-source', required=True,
type=click.Path(exists=True))
@click.argument('salvus-domain-file', required=True,
type=click.Path(exists=True))
@click.argument('salvus-events-file', required=True,
type=click.Path(exists=True))
@click.argument('response-stationxml', required=True,
type=click.Path(exists=True))
@click.argument('output-folder', required=True,
type=click.Path(exists=True))
@click.option('--seconds-before', type=float, default=240, show_default=True,
help="Start of data-window before origin-time")
@click.option('--seconds-after', type=float, default=240, show_default=True,
help="End of data-window after origin-time")
@click.option('--data-name', type=str, default='raw_data', show_default=True,
help="Name of data folder within Salvus' expected folder hierarchy")
@click.option('--receiver-fields', type=str, default='displacement', show_default=True,
help="Name of data folder within Salvus' expected folder hierarchy")
def process(asdf_source, salvus_domain_file, salvus_events_file,
response_stationxml, output_folder,
seconds_before, seconds_after, data_name, receiver_fields):
"""
ASDF_SOURCE: Path to text file containing paths to ASDF files\n
SALVUS_DOMAIN_FILE: Path to Salvus domain file in json format
SALVUS_EVENTS_FILE: Path to Salvus events file in json format
RESPONSE_STATIONXML: Path to stationXML file containing instrument responses
OUTPUT_FOLDER: Output folder \n
"""

fds = None
dom = None
events = None
inv = None

try:
fds = FederatedASDFDataSet(asdf_source)
except Exception as e:
print(str(e))
assert 0, 'Failed to instantiate FederatedASDFDataSet with input file: {}'.format(asdf_source)
# end try

try:
print('Reading Salvus domain file: {}'.format(salvus_domain_file))
dom = Domain(salvus_domain_file)
except Exception as e:
print(str(e))
assert 0, 'Failed to instantiate Domain with input file: {}'.format(salvus_domain_file)
# end try

try:
print('Reading Salvus events file: {}'.format(salvus_events_file))
events = json.load(open(salvus_events_file, 'r'))
except Exception as e:
print(str(e))
assert 0, 'Failed to load Salvus events file: {}'.format(salvus_events_file)
# end try

try:
print('Reading inventory file with responses: {}'.format(response_stationxml))
inv = read_inventory(response_stationxml)
except Exception as e:
print(str(e))
assert 0, 'Failed to read inventory file: {}'.format(response_stationxml)
# end try

# extract data for all events
for ek, e in tqdm(events.items(), desc='Events: '):
extract_data_for_event(fds, dom, {ek:e}, inv, output_folder, data_name,
receiver_fields,
seconds_before, seconds_after)
if DEBUG: break
# end for


# copy salvus-events to output folder
shutil.copyfile(salvus_events_file, os.path.join(output_folder, 'EVENTS/event_store.json'))
# end func

if (__name__ == '__main__'):
process()
# end if

0 comments on commit bc9189e

Please sign in to comment.