Skip to content

Commit

Permalink
Salvus exporter (#254)
Browse files Browse the repository at this point in the history
* Exporter is now able to augment data from IRIS

* Applying more stringent quality-control

* Parallelized Script

* asdf2salvus.py can now be run on Gadi ARE desktops in parallel, with
  access to the external network.
  • Loading branch information
geojunky committed Jul 5, 2023
1 parent 15be567 commit 8a84854
Showing 1 changed file with 196 additions and 26 deletions.
222 changes: 196 additions & 26 deletions seismic/ASDFdatabase/asdf2salvus.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,18 @@
from collections import defaultdict
import numpy as np
from obspy import Stream, UTCDateTime, read_inventory, Inventory
from obspy.clients.fdsn.client import Client
import pyasdf
from mpi4py import MPI
from seismic.ASDFdatabase.FederatedASDFDataSet import FederatedASDFDataSet
from seismic.ASDFdatabase._FederatedASDFDataSetImpl import split_list
import click
from shapely.geometry.polygon import Polygon, Point
from tqdm import tqdm
import json
import tempfile

class Domain():
class StandardDomain():
def __init__(self, domain_json_file:str):
dom = json.load(open(domain_json_file, 'r'))
lat_center, lat_extent, lon_center, lon_extent = \
Expand All @@ -52,6 +56,21 @@ def contains(self, lon:float, lat:float):
# end func
# end class

class Domain():
def __init__(self, domain_json_file:str):
dom = json.load(open(domain_json_file, 'r'))

coords = dom['features'][0]['geometry']['coordinates'][0]
self.bounding_polygon = Polygon(coords)
# end func

def contains(self, lon:float, lat:float):
p = Point((lon, lat))

return self.bounding_polygon.contains(p)
# end func
# end class

def get_validated_waveform(fds: FederatedASDFDataSet,
net, sta, loc, cha, st, et):
stream = fds.get_waveforms(net, sta, loc, cha,
Expand All @@ -67,11 +86,20 @@ def get_validated_waveform(fds: FederatedASDFDataSet,
# end if

stream.trim(st, et)
stream.merge()

try:
stream.merge()
except:
stream = Stream([])
return stream
# end try

if (len(stream) > 1):
stream = Stream([])
return stream
elif((stream[0].stats.endtime - stream[0].stats.starttime) < (et - st)):
stream = Stream([])
return stream
# end if

if any(isinstance(tr.data, np.ma.masked_array) for tr in stream):
Expand All @@ -90,7 +118,7 @@ def has_masked_values(data_stream):
# end func

if (has_masked_values(stream)):
pass
stream = Stream([])
else:
for tr in stream: tr.data = np.array(tr.data)
# end if
Expand Down Expand Up @@ -140,7 +168,7 @@ def is_preferred_component(cha: str):
lambda: defaultdict(lambda: defaultdict(list))))

for irow, row in enumerate(rows):
net, sta, loc, cha, lon, lat, elev = row
net, sta, loc, cha, _, _, _ = row

if (cha[0] == 'S' and not include_sband): continue
if (not is_preferred_component(cha)): continue
Expand Down Expand Up @@ -178,7 +206,7 @@ def is_preferred_component(cha: str):
# sanity checks
for nk, nv in itype_dict.items():
for sk, sv in itype_dict[nk].items():
assert len(itype_dict[nk][sk]) == 1, 'Failed to remove multiple location codes ' \
assert len(itype_dict[nk][sk]) <= 1, 'Failed to remove multiple location codes ' \
'found for station {}.{}:{}'.format(nk, sk,
itype_dict[nk][sk].keys())
# end for
Expand Down Expand Up @@ -249,7 +277,7 @@ def remove_comments(iinv: Inventory) -> Inventory:
h5_ofn = os.path.join(h5_dir, 'receivers.h5')
receivers_ofn = os.path.join(output_folder, 'EVENTS/{}/receivers.json'.format(ename))

ods = pyasdf.ASDFDataSet(h5_ofn, mode='w')
ods = pyasdf.ASDFDataSet(h5_ofn, mode='w', mpi=False)

oinv = None
otime = UTCDateTime(edict[0]['arguments']['reference_time_utc_string'])
Expand All @@ -258,6 +286,7 @@ def remove_comments(iinv: Inventory) -> Inventory:
rows = fds.get_stations(st, et)
rows = filter_channels_locations(rows) # filter out unwanted channels and band-codes

data_added_set = set()
receivers_dict = defaultdict(dict)
pbar = tqdm(total=len(rows), desc=ename)
for row in rows:
Expand Down Expand Up @@ -305,6 +334,8 @@ def remove_comments(iinv: Inventory) -> Inventory:
"longitude": lon,
"station_code": sta,
"fields": [receiver_fields]}}

data_added_set.add(seed_id)
except Exception as e:
print('Failed to add inventory/waveform with error: {}. Moving along..'.format(str(e)))
# end try
Expand All @@ -315,10 +346,123 @@ def remove_comments(iinv: Inventory) -> Inventory:
# end for
pbar.close()

###############################################################
# Now download additional data from IRIS
###############################################################
client = Client("IRIS")
for net in inventory.networks:
for sta in net.stations:
netsta = '{}.{}'.format(net.code, sta.code)

already_added = False
for seed_id in data_added_set:
if (netsta in seed_id):
already_added = True
break
# end if
# end for
if(already_added): continue

stream = []
try:
stream = client.get_waveforms(net.code, sta.code, '*', 'BH?,HH?',
st-1, et+1)
stream.trim(st, et)
except:
pass
# end try

if (len(stream)):

rows = []
for tr in stream: rows.append([tr.stats.network,
tr.stats.station,
tr.stats.location,
tr.stats.channel,
None, None, None])
rows = filter_channels_locations(rows)

for row in rows:
tr = stream.select(network=row[0],
station=row[1],
location=row[2],
channel=row[3])
tr = tr[0]
if((tr.stats.endtime - tr.stats.starttime) < (et - st)): continue
print('Adding trace: {}'.format(tr.id))

resp = None
try:
resp = inventory.get_response(tr.id, tr.stats.starttime)
except Exception as e:
continue
# end try

if(resp):
try:
oinv = inventory.select(network=tr.stats.network,
station=tr.stats.station,
location=tr.stats.location,
channel=tr.stats.channel)
oinv = remove_comments(oinv)
ods.add_stationxml(oinv)

ods.add_waveforms(tr, tag)

receivers_dict['{}.{}.{}'.format(tr.stats.network,
tr.stats.station,
tr.stats.location)] = \
{"class_name": "salvus.flow.simple_config.receiver.seismology.SideSetPoint3D",
"salvus_version": "0.12.8",
"arguments": {
"depth_in_m": 0.0,
"radius_of_sphere_in_m": 6371000.0,
"network_code": tr.stats.network,
"location_code": tr.stats.location,
"side_set_name": "r1",
"latitude": sta.latitude,
"longitude": sta.longitude,
"station_code": tr.stats.station,
"fields": [receiver_fields]}}

data_added_set.add(tr.id)
except Exception as e:
print('Failed to add inventory/waveform with error: {}. Moving along..'.format(str(e)))
# end try
# end if
# end for
#if (DEBUG): break
# end if
# end for
# end for

json.dump(receivers_dict, open(receivers_ofn, 'w+'), indent=4)
del ods
# end func

def trim_inventory(inv:Inventory, dom:Domain):
netsta = []

for net in inv.networks:
for sta in net.stations:
if(dom.contains(sta.longitude, sta.latitude)):
netsta.append([net.code, sta.code])
# end if
# end for
# end for

newInv = None
for ns in netsta:
if(newInv is None):
newInv = inv.select(network=ns[0], station=ns[1])
else:
newInv += inv.select(network=ns[0], station=ns[1])
# emd if
# end for

return newInv
# end func

CONTEXT_SETTINGS = dict(help_option_names=['-h', '--help'])
@click.command(context_settings=CONTEXT_SETTINGS)
@click.argument('asdf-source', required=True,
Expand All @@ -327,7 +471,7 @@ def remove_comments(iinv: Inventory) -> Inventory:
type=click.Path(exists=True))
@click.argument('salvus-events-file', required=True,
type=click.Path(exists=True))
@click.argument('response-stationxml', required=True,
@click.argument('iris_inventory', required=True,
type=click.Path(exists=True))
@click.argument('output-folder', required=True,
type=click.Path(exists=True))
Expand All @@ -340,13 +484,13 @@ def remove_comments(iinv: Inventory) -> Inventory:
@click.option('--receiver-fields', type=str, default='displacement', show_default=True,
help="Name of data folder within Salvus' expected folder hierarchy")
def process(asdf_source, salvus_domain_file, salvus_events_file,
response_stationxml, output_folder,
iris_inventory, output_folder,
seconds_before, seconds_after, data_name, receiver_fields):
"""
ASDF_SOURCE: Path to text file containing paths to ASDF files\n
SALVUS_DOMAIN_FILE: Path to Salvus domain file in json format
SALVUS_EVENTS_FILE: Path to Salvus events file in json format
RESPONSE_STATIONXML: Path to stationXML file containing instrument responses
SALVUS_DOMAIN_FILE: Path to Salvus domain file in json format\n
SALVUS_EVENTS_FILE: Path to Salvus events file in json format\n
IRIS_INVENTORY: Path to complete channel-level IRIS inventory, containing instrument responses\n
OUTPUT_FOLDER: Output folder \n
"""
Expand All @@ -356,6 +500,10 @@ def process(asdf_source, salvus_domain_file, salvus_events_file,
events = None
inv = None

comm = MPI.COMM_WORLD
nproc = comm.Get_size()
rank = comm.Get_rank()

try:
fds = FederatedASDFDataSet(asdf_source)
except Exception as e:
Expand All @@ -364,42 +512,64 @@ def process(asdf_source, salvus_domain_file, salvus_events_file,
# end try

try:
print('Reading Salvus domain file: {}'.format(salvus_domain_file))
if(rank == 0): print('Reading Salvus domain file: {}'.format(salvus_domain_file))
dom = Domain(salvus_domain_file)
except Exception as e:
print(str(e))
assert 0, 'Failed to instantiate Domain with input file: {}'.format(salvus_domain_file)
# end try

try:
print('Reading Salvus events file: {}'.format(salvus_events_file))
if(rank == 0): print('Reading Salvus events file: {}'.format(salvus_events_file))
events = json.load(open(salvus_events_file, 'r'))
except Exception as e:
print(str(e))
assert 0, 'Failed to load Salvus events file: {}'.format(salvus_events_file)
# end try

try:
print('Reading inventory file with responses: {}'.format(response_stationxml))
inv = read_inventory(response_stationxml)
except Exception as e:
print(str(e))
assert 0, 'Failed to read inventory file: {}'.format(response_stationxml)
# end try
tempdir = None
trimmed_inventory_fn = None
if(rank == 0):
try:
print('Reading inventory file with responses: {}'.format(iris_inventory))
inv = read_inventory(iris_inventory)
except Exception as e:
print(str(e))
assert 0, 'Failed to read inventory file: {}'.format(iris_inventory)
# end try

# trim inventory by Domain
inv = trim_inventory(inv, dom)

# save trimmed inventory to a temp folder, so other ranks can load it
tempdir = tempfile.mkdtemp()

trimmed_inventory_fn = os.path.join(tempdir, 'trimmed_inventory.xml')
inv.write(trimmed_inventory_fn, format="STATIONXML")
print(trimmed_inventory_fn)
# end if
comm.barrier()

# Load trimmed inventory on all ranks
trimmed_inventory_fn = comm.bcast(trimmed_inventory_fn, root=0)
inv = read_inventory(trimmed_inventory_fn)

# extract data for all events
for ek, e in tqdm(events.items(), desc='Events: '):
extract_data_for_event(fds, dom, {ek:e}, inv, output_folder, data_name,
proc_ekeys = split_list(list(events.keys()), nproc)[rank]
for ek in tqdm(proc_ekeys, desc='Rank: {}: Events: '.format(rank)):
extract_data_for_event(fds, dom, {ek:events[ek]}, inv, output_folder, data_name,
receiver_fields,
seconds_before, seconds_after)
#if DEBUG: break
# end for


# copy salvus-events to output folder
shutil.copyfile(salvus_events_file, os.path.join(output_folder, 'EVENTS/event_store.json'))
if(rank == 0):
# copy salvus-events to output folder
shutil.copyfile(salvus_events_file, os.path.join(output_folder, 'EVENTS/event_store.json'))
shutil.rmtree(tempdir)
# end if
# end func

if (__name__ == '__main__'):
process()
# end if
# end if

0 comments on commit 8a84854

Please sign in to comment.