Skip to content

Commit

Permalink
SSST relocation (#243)
Browse files Browse the repository at this point in the history
* Structural Changes for SSST-relocation

* SSST Relocation

* Added a new class for manipulating parametric-data
* Updated picking routines that now use the new parametric-data class

* Entire file-content is no longer loaded into memory.

* Arrivals are now labelled by event-source, e.g GA, ISC etc.

* Updating ellip-corr hash

* Updating ellip-corr

* New Classes for SSST-Relocation

* Added a new class for travel-time interpolation
* Added a barebone ssst-relocator class

* Added a parallel function for tt interpolation

* Removing MPI-parallel Catalogue Ingestion

MPI-Allgatherv is prone to data corruption when tens of processors are
used to gather arrays on all ranks amounting to hundreds of MB of data.
Catalogue ingestion now takes place on rank 0 and events and arrivals
are saved to disk -- subsequently, all other ranks simply load these
events and arrivals from disk.

* Minor typos

* Disk-based Sync Across MPI Ranks

* Added a disk-based sync approach for variables across MPI-ranks. The
rationale behind is documented in Jira.

* Added methods for computing ellipticity-correction

* New class for vectorized computation of ellipticity corrections.

* Added a function to coalesce discrepant network codes

* Added a function for parallel residual computation

* Adding ellipticity correction table

* Adding support for masked local computations on each rank

* Parallelized phase-redefinition

* Added parallel ssst-correction calculator

* Moving function for coalescing network codes to base class

* Adding some documentation

* Working draft of SSST-relocator

* Refactored SSST-relocator

* refactored and cleaned up SSST-relocator
* minor changes to travel_time and ellipticity modules

* Config Parser and Miscellaneous Changes

* Added config parser for relocation/redefinition
* Added support for dropping automatic phases during that do not meet quality
  criteria during relocation

* Adding SST Relocation

* Added SST relocation as a potential first step
* Optimized parallel sync

* Miscellaneous Optimizations

* Arrivals whose phases are redefined without their origins being
  relocated are now handled such that SST corrections (if available) are
  factored in during phase redefinition in each SSST-relocation iteration.

* Adding a cli

* Minor cleanup

* Adding Plotting Routines

* Minor cleanups in ssst_relocator and moved CLI to ssst_relocate
* Added routines for generating detailed plots in ssst_relocate

* Added Arrivals Exporter

* Some minor changes in ssst_relocator
* Added export_arrivals for exporting cleansed arrivals that meet
  user-defined criteria

* Miscellaneous Changes

* cwb2asdf.py can now convert semi-permanent mseed files into ASDF
* extract_event_traces.py now reflect recent changes made to the picking
  workflow
* added more plots to export_arrivals.py
  • Loading branch information
geojunky committed Aug 16, 2022
1 parent bb47854 commit 3f17559
Show file tree
Hide file tree
Showing 32 changed files with 5,233 additions and 149 deletions.
2 changes: 1 addition & 1 deletion ellip-corr
2 changes: 1 addition & 1 deletion seismic/ASDFdatabase/FederatedASDFDataSet.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ def get_stations(self, starttime, endtime, network=None, station=None, location=
:param location: location code (optional)
:param channel: channel code (optional)
:return: a list containing [net, sta, loc, cha, lon, lat] in each row
:return: a list containing [net, sta, loc, cha, lon, lat, elev_m] in each row
"""
results = self.fds.get_stations(starttime, endtime, network, station, location, channel)
return results
Expand Down
16 changes: 9 additions & 7 deletions seismic/ASDFdatabase/_FederatedASDFDataSetImpl.py
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ def decode_tag(tag, type='raw_recording'):
self.conn.execute('create table wdb(ds_id smallint, net varchar(6), sta varchar(6), loc varchar(6), '
'cha varchar(6), st double, et double, tag text)')
self.conn.execute('create table netsta(ds_id smallint, net varchar(6), sta varchar(6), lon double, '
'lat double)')
'lat double, elev_m double)')
self.conn.execute('create table masterinv(inv blob)')

metadatalist = []
Expand All @@ -349,12 +349,13 @@ def decode_tag(tag, type='raw_recording'):
for k in list(coords_dict.keys()):
lon = coords_dict[k]['longitude']
lat = coords_dict[k]['latitude']
elev_m = coords_dict[k]['elevation_in_m']
nc, sc = k.split('.')
metadatalist.append([ids, nc, sc, lon, lat])
metadatalist.append([ids, nc, sc, lon, lat, elev_m])
# end for
# end for
self.conn.executemany('insert into netsta(ds_id, net, sta, lon, lat) values '
'(?, ?, ?, ?, ?)', metadatalist)
self.conn.executemany('insert into netsta(ds_id, net, sta, lon, lat, elev_m) values '
'(?, ?, ?, ?, ?, ?)', metadatalist)
self.conn.execute('insert into masterinv(inv) values(?)',
[cPickle.dumps(masterinv, cPickle.HIGHEST_PROTOCOL)])
self.conn.commit()
Expand Down Expand Up @@ -417,8 +418,8 @@ def decode_tag(tag, type='raw_recording'):
# Load metadata
rows = self.conn.execute('select * from netsta').fetchall()
for row in rows:
ds_id, net, sta, lon, lat = row
self.asdf_station_coordinates[ds_id]['%s.%s' % (net.strip(), sta.strip())] = [lon, lat]
ds_id, net, sta, lon, lat, elev_m = row
self.asdf_station_coordinates[ds_id]['%s.%s' % (net.strip(), sta.strip())] = [lon, lat, elev_m]
# end for

# Load master inventory
Expand Down Expand Up @@ -468,7 +469,8 @@ def get_stations(self, starttime, endtime, network=None, station=None, location=

rv = (net, sta, loc, cha,
self.asdf_station_coordinates[ds_id]['%s.%s' % (net, sta)][0],
self.asdf_station_coordinates[ds_id]['%s.%s' % (net, sta)][1])
self.asdf_station_coordinates[ds_id]['%s.%s' % (net, sta)][1],
self.asdf_station_coordinates[ds_id]['%s.%s' % (net, sta)][2])
results.add(rv)
# end for

Expand Down
26 changes: 23 additions & 3 deletions seismic/ASDFdatabase/cwb2asdf/cwb2asdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,15 @@ def make_ASDF_tag(tr, tag):
@click.argument('inventory', required=True,
type=click.Path(exists=True))
@click.argument('output-file-name', required=True)
@click.option('--channels-to-extract', type=str, default=None, help="Channels to extract, within quotes and space- "
"separated.")
@click.option('--min-length-sec', type=int, default=None, help="Minimum length in seconds")
@click.option('--merge-threshold', type=int, default=None, help="Merge traces if the number of traces fetched for an "
"interval exceeds this threshold")
@click.option('--ntraces-per-file', type=int, default=3600, help="Maximum number of traces per file; if exceeded, the "
"file is ignored.")
def process(input_folder, inventory, output_file_name, min_length_sec, merge_threshold, ntraces_per_file):
def process(input_folder, inventory, output_file_name, channels_to_extract, min_length_sec, merge_threshold,
ntraces_per_file):
"""
INPUT_FOLDER: Path to input folder containing miniseed files \n
INVENTORY: Path to FDSNStationXML inventory containing channel-level metadata for all stations \n
Expand All @@ -92,6 +95,11 @@ def _write(ds, ostream, inventory_dict, netsta_set):
# end for
# end func

# process channels-to-extract
if(channels_to_extract is not None):
channels_to_extract = set([item.strip().upper() for item in channels_to_extract.split()])
# end if

comm = MPI.COMM_WORLD
nproc = comm.Get_size()
rank = comm.Get_rank()
Expand All @@ -115,20 +123,29 @@ def _write(ds, ostream, inventory_dict, netsta_set):

files = np.array(files)
random.Random(nproc).shuffle(files)
# files = files[:100]
#files = files[370:380]

ustations = set()
networklist = []
stationlist = []
filtered_files = []
for file in files:
_, _, net, sta, _ = file.split('.')
#_, _, net, sta, _ = file.split('.')
#tokens = os.path.basename(file).split('.')
#net, sta = tokens[0], tokens[1]

st = read(file, headonly=True)
if(len(st) == 0): continue

net = st[0].meta.network
sta = st[0].meta.station

ustations.add('%s.%s' % (net, sta))
networklist.append(net)
stationlist.append(sta)
filtered_files.append(file)
# end for
files = np.array(filtered_files)

networklist = np.array(networklist)
stationlist = np.array(stationlist)
Expand Down Expand Up @@ -222,6 +239,9 @@ def _write(ds, ostream, inventory_dict, netsta_set):

if(len(ostream) < BUFFER_LENGTH):
for tr in st:
if (channels_to_extract):
if (tr.meta.channel not in channels_to_extract): continue
# end if

if (tr.stats.npts == 0): continue
if (min_length_sec):
Expand Down
21 changes: 10 additions & 11 deletions seismic/extract_event_traces.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ def asdf_get_waveforms(asdf_dataset, network, station, location, channel, startt
location=location)
if matching_stations:
ch_matcher = re.compile(channel)
for net, sta, loc, cha, _, _ in matching_stations:
for net, sta, loc, cha, _, _, _ in matching_stations:
if ch_matcher.match(cha):
st += asdf_dataset.get_waveforms(net, sta, loc, cha, starttime, endtime)
# end for
Expand Down Expand Up @@ -303,21 +303,20 @@ def __init__(self, taup_model_name):
def pick(self, ztrace, ntrace, etrace, phase='P'):
slope_ratio = -1
arrival_time = UTC(-1)
origin = Origin(ztrace.stats.event_time,
ztrace.stats.event_latitude,
ztrace.stats.event_longitude,
ztrace.stats.event_depth)
event = Event()
event.preferred_origin = origin
event.public_id = Picker.counter; Picker.counter += 1
event.preferred_magnitude = Magnitude(mag=ztrace.stats.event_magnitude, mag_type='M')

# construct a named array for event meta-data, as expected in extract_[p/s]
event_fields = {'names': ['source', 'event_id', 'origin_ts', 'mag', 'lon', 'lat', 'depth_km'],
'formats': ['S10', 'i4', 'f8', 'f4', 'f4', 'f4', 'f4']}
events = np.array([('', 0, ztrace.stats.event_time.timestamp,
ztrace.stats.event_magnitude, ztrace.stats.event_longitude,
ztrace.stats.event_latitude, ztrace.stats.event_depth)], dtype=event_fields)

result = None
if(phase == 'P'):
result = extract_p(self._taup_model, self._picker_list_p, event, ztrace.stats.station_longitude,
result = extract_p(self._taup_model, self._picker_list_p, events[0], ztrace.stats.station_longitude,
ztrace.stats.station_latitude, Stream(ztrace), margin=5)
elif(phase == 'S'):
result = extract_s(self._taup_model, self._picker_list_s, event, ztrace.stats.station_longitude,
result = extract_s(self._taup_model, self._picker_list_s, events[0], ztrace.stats.station_longitude,
ztrace.stats.station_latitude, Stream(ntrace), Stream(etrace),
ntrace.stats.back_azimuth, margin=10)
else:
Expand Down
1 change: 1 addition & 0 deletions seismic/pick_harvester/ellip/elcordir.tbl

Large diffs are not rendered by default.

162 changes: 162 additions & 0 deletions seismic/pick_harvester/ellipticity.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
from collections import defaultdict
import numpy as np
from scipy.interpolate import RegularGridInterpolator
import traceback
import os

class Phase:
def __init__(self, ecdist, depth, tau0, tau1, tau2):
self.ecdist = ecdist
self.depth = depth
self.tau0 = tau0
self.tau1 = tau1
self.tau2 = tau2

self.tau0io = RegularGridInterpolator((self.ecdist, self.depth), self.tau0, method='linear',
bounds_error=False, fill_value=0.)
self.tau1io = RegularGridInterpolator((self.ecdist, self.depth), self.tau1, method='linear',
bounds_error=False, fill_value=0.)
self.tau2io = RegularGridInterpolator((self.ecdist, self.depth), self.tau2, method='linear',
bounds_error=False, fill_value=0.)
# end func
# end class

class Ellipticity:
def __init__(self):
self._elcordir_tbl = os.path.dirname(os.path.abspath(__file__)) + '/ellip/elcordir.tbl'
self.depth = np.array([0.0, 100.0, 200.0, 300.0, 500.0, 700.0], dtype='f4')
self.phases = defaultdict(list)
self.phase_alias = { b'Pg': b'Pup',
b'Sg': b'Sup',
b'pPg': b'pP',
b'sPg': b'sP',
b'pSg': b'pS',
b'sSg': b'sS',
b'Pb': b'P',
b'Sb': b'S',
b'pPb': b'pP',
b'sPb': b'sP',
b'pSb': b'pS',
b'sSb': b'sS',
b'Pn': b'P',
b'Sn': b'S',
b'pPn': b'pP',
b'sPn': b'sP',
b'pSn': b'pS',
b'sSn': b'sS',
b'SPn': b'SP',
b'SPb': b'SP',
b'SPg': b'SP',
b'SnP': b'SP',
b'PSn': b'PS',
b'PnPn': b'PP',
b'SnSn': b'SS',
b'p': b'Pup',
b's': b'Sup',
b'Pdif': b'Pdiff',
b'Sdif': b'Sdiff' }
self._parse_table()
# end func

def _parse_table(self):
line = open(self._elcordir_tbl).read()
ll = len(line)

indices = np.arange(0, ll+80, 80)
rows = list()

for i in range(len(indices)-1):
row = line[indices[i]:indices[i+1]].split()
rows.append(row)
#end for

phase = None
ecdist = list()
tau = 0
tau0 = list()
tau1 = list()
tau2 = list()
for irow, row in enumerate(rows):
if len(row) == 4:
if irow:
ecdist = np.array(ecdist).astype('f4')
tau0 = np.array(tau0).astype('f4')
tau1 = np.array(tau1).astype('f4')
tau2 = np.array(tau2).astype('f4')
self.phases[phase] = Phase(ecdist, self.depth, tau0, tau1, tau2)
ecdist = list()
tau0 = list()
tau1 = list()
tau2 = list()
#end if
phase = (row[0]).encode() # convert to byte-string
elif len(row) == 1:
ecdist.append(float(row[0]))
elif len(row) == 6 and tau == 0:
tau0.append(row)
tau = (tau + 1) % 3
elif len(row) == 6 and tau == 1:
tau1.append(row)
tau = (tau + 1) % 3
elif len(row) == 6 and tau == 2:
tau2.append(row)
tau = (tau + 1) % 3
#end if
#end for

for pnew, palias in self.phase_alias.items():
if(palias in self.phases.keys()): self.phases[pnew] = self.phases[palias]
# end for
# end func

def get_correction(self, phase, ecdist, depth_km, elat, azim):
azim = np.radians(azim)
ecolat = np.radians(90 - elat)
s3 = np.sqrt(3.0) / 2.0

try:
if(type(phase) == np.ndarray):
result = np.zeros(len(phase), dtype='f4')

corr_count = 0
for cphase in self.phases.keys():
indices = np.argwhere(cphase == phase).flatten()

if(len(indices)):
sc0 = 0.25 * (1.0 + 3.0 * np.cos(2.0 * ecolat[indices]))
sc1 = s3 * np.sin(2.0 * ecolat[indices])
sc2 = s3 * np.sin(ecolat[indices]) * np.sin(ecolat[indices])

tau0 = self.phases[cphase].tau0io((ecdist[indices], depth_km[indices]))
tau1 = self.phases[cphase].tau1io((ecdist[indices], depth_km[indices]))
tau2 = self.phases[cphase].tau2io((ecdist[indices], depth_km[indices]))

result[indices] = sc0*tau0 + sc1*np.cos(azim[indices])*tau1 + \
sc2*np.cos(2.0*azim[indices])*tau2
corr_count += len(indices)
# end if
# end for
#if(corr_count < len(phase)): print('Warning: some phases {} not found..'.format(set(list(phase)) - set(self.phases.keys())))

return result
else:
result = 0.
sc0 = 0.25 * (1.0 + 3.0 * np.cos(2.0 * ecolat))
sc1 = s3 * np.sin(2.0 * ecolat)
sc2 = s3 * np.sin(ecolat) * np.sin(ecolat)

tau0 = self.phases[phase].tau0io((ecdist, depth_km))
tau1 = self.phases[phase].tau1io((ecdist, depth_km))
tau2 = self.phases[phase].tau2io((ecdist, depth_km))

result = sc0 * tau0 + sc1 * np.cos(azim) * tau1 + \
sc2 * np.cos(2.0 * azim) * tau2

return result
# end if
except Exception as e:
print(traceback.format_exc())
raise ValueError('Phase not found..')
# end try
# end func
# end class

0 comments on commit 3f17559

Please sign in to comment.