Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Speed-up Tribe and Party reading #462

Open
wants to merge 10 commits into
base: develop
Choose a base branch
from
11 changes: 11 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,14 @@
## Current
* utils.mag_calc:
- relative_magnitude: implemented full magnitude bias-correction for CC and SNR
- relative_amplitude: returns dicts for SNR measurements
* core.match_filter.party:
- Implement parallel reading, and chunked writing to save memory on writing and
accelerate reading - PR: #462
* core.match_filter.tribe:
- Implement parallel reading and chunked writing to save memory on writing and
accelerate reading - PR: #462

## 0.4.3
* core.match_filter
- match_filter:
Expand Down
17 changes: 9 additions & 8 deletions eqcorrscan/core/match_filter/family.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@
import logging

from obspy import UTCDateTime, Stream, Catalog
from obspy.core.event import (
StationMagnitude, Magnitude, ResourceIdentifier, WaveformStreamID,
CreationInfo, StationMagnitudeContribution)
# from obspy.core.event import (
# StationMagnitude, Magnitude, ResourceIdentifier, WaveformStreamID,
# CreationInfo, StationMagnitudeContribution)

from eqcorrscan.core.match_filter.matched_filter import _group_process
from eqcorrscan.core.match_filter.detection import Detection, get_catalog
from eqcorrscan.utils.plotting import cumulative_detections
from eqcorrscan.utils.mag_calc import relative_magnitude
# from eqcorrscan.utils.mag_calc import relative_magnitude

Logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -824,7 +824,7 @@ def _write_family(family, filename):
return


def _read_family(fname, all_cat, template, encoding="UTF8",
def _read_family(fname, all_cat_dict, template, encoding="UTF8",
estimate_origin=True):
"""
Internal function to read csv family files.
Expand All @@ -843,11 +843,12 @@ def _read_family(fname, all_cat, template, encoding="UTF8",
key = key_pair.split(': ')[0].strip()
value = key_pair.split(': ')[-1].strip()
if key == 'event':
if len(all_cat) == 0:
if len(all_cat_dict) == 0:
gen_event = True
continue
el = [e for e in all_cat
if str(e.resource_id).split('/')[-1] == value][0]
el = all_cat_dict.get(value)
# [e for e in all_cat
# if str(e.resource_id).split('/')[-1] == value][0]
det_dict.update({'event': el})
elif key == 'detect_time':
det_dict.update(
Expand Down
104 changes: 79 additions & 25 deletions eqcorrscan/core/match_filter/party.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import tarfile
import tempfile
import logging
import multiprocessing
from os.path import join

import numpy as np
Expand Down Expand Up @@ -559,7 +560,16 @@ def decluster(self, trig_int, timing='detect', metric='avg_cor',
"""
if self.__len__() == 0:
return self

if min_chans > 0:
self.min_chans(min_chans - 1)
# min_chans uses explicit >, this uses >=

all_detections = [d for fam in self.families for d in fam.detections]

if len(all_detections) == 0:
return Party()

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: Check what these lines are about - they don't seem related to the topic of this PR?

catalog = None
if hypocentral_separation:
catalog = Catalog([d.event for fam in self.families
Expand All @@ -569,10 +579,6 @@ def decluster(self, trig_int, timing='detect', metric='avg_cor',
"hypocentral separation")
catalog = None

if min_chans > 0:
for d in all_detections.copy():
if d.no_chans < min_chans:
all_detections.remove(d)
if len(all_detections) == 0:
return Party()

Expand Down Expand Up @@ -657,7 +663,8 @@ def copy(self):
return copy.deepcopy(self)

def write(self, filename, format='tar', write_detection_catalog=True,
catalog_format="QUAKEML", overwrite=False):
catalog_format="QUAKEML", overwrite=False,
max_events_per_file=1000):
"""
Write Family out, select output format.

Expand All @@ -682,6 +689,11 @@ def write(self, filename, format='tar', write_detection_catalog=True,
:param overwrite:
Specifies whether detection-files are overwritten if they exist
already. By default, no files are overwritten.
:type max_events_per_file: int
:param max_events_per_file:
Maximum events to write per QuakeML file. When this number is
exceeded a new file will be written to the tar archive - allows
for parallel IO.

.. NOTE::
csv format will write out detection objects, all other
Expand Down Expand Up @@ -735,11 +747,16 @@ def write(self, filename, format='tar', write_detection_catalog=True,
all_cat = Catalog()
for family in self.families:
all_cat += family.catalog
if not len(all_cat) == 0:
all_cat.write(
join(temp_dir, 'catalog.{0}'.format(
CAT_EXT_MAP[catalog_format])),
i, j = 0, 0
while i < len(all_cat):
sub_catalog = Catalog(
all_cat[i:i + max_events_per_file])
sub_catalog.write(
join(temp_dir, 'catalog.{0}.{1}'.format(
j, CAT_EXT_MAP[catalog_format])),
format=catalog_format)
j += 1
i += max_events_per_file
for i, family in enumerate(self.families):
Logger.debug('Writing family %i' % i)
name = family.template.name + '_detections.csv'
Expand All @@ -754,7 +771,7 @@ def write(self, filename, format='tar', write_detection_catalog=True,
return self

def read(self, filename=None, read_detection_catalog=True,
estimate_origin=True):
estimate_origin=True, max_processes=1):
calum-chamberlain marked this conversation as resolved.
Show resolved Hide resolved
"""
Read a Party from a file.

Expand All @@ -771,6 +788,10 @@ def read(self, filename=None, read_detection_catalog=True,
If True and no catalog is found, or read_detection_catalog is False
then new events with origins estimated from the template origin
time will be created.
:type max_processes: int
:param max_processes:
Maximum number of processes to use for reading files in parallel.
Defaults to single-threaded.

.. rubric:: Example

Expand All @@ -779,8 +800,9 @@ def read(self, filename=None, read_detection_catalog=True,
"""
from eqcorrscan.core.match_filter.tribe import Tribe

tribe = Tribe()
families = []
tribe, families = Tribe(), []
max_processes = max_processes or multiprocessing.cpu_count()

if filename is None:
# If there is no filename given, then read the example.
filename = os.path.join(
Expand All @@ -794,6 +816,8 @@ def read(self, filename=None, read_detection_catalog=True,
else:
# Expand wildcards
filenames = glob.glob(filename)
if len(filenames) == 0:
raise FileNotFoundError(f"{filename} does not exist")
for _filename in filenames:
Logger.info(f"Reading from {_filename}")
with tarfile.open(_filename, "r:*") as arc:
Expand All @@ -803,17 +827,32 @@ def read(self, filename=None, read_detection_catalog=True,
# files then we can just read in extra templates as needed.
# Read in families here!
party_dir = glob.glob(temp_dir + os.sep + '*')[0]
tribe._read_from_folder(dirname=party_dir)
det_cat_file = glob.glob(os.path.join(party_dir, "catalog.*"))
if len(det_cat_file) != 0 and read_detection_catalog:
try:
all_cat = read_events(det_cat_file[0])
except TypeError as e:
Logger.error(e)
pass
Logger.info("Reading tribe")
tribe._read_from_folder(
dirname=party_dir, max_processes=max_processes)
det_cat_files = glob.glob(os.path.join(party_dir, "catalog.*"))
Logger.info("Reading detection catalog")
if len(det_cat_files) != 0 and read_detection_catalog:
if max_processes > 1:
with multiprocessing.Pool(processes=max_processes) as pool:
calum-chamberlain marked this conversation as resolved.
Show resolved Hide resolved
_futures = pool.imap_unordered(
_read_catalog_pass_error, det_cat_files,
chunksize=(len(det_cat_files) // max_processes) + (
len(det_cat_files) % max_processes))
sub_catalogs = [cat for cat in _futures]
else:
sub_catalogs = [_read_catalog_pass_error(df)
for df in det_cat_files]
all_cat_dict = {
e.resource_id.id.split('/')[-1]: e
for cat in sub_catalogs
for e in cat}
else:
all_cat = Catalog()
all_cat_dict = dict()
Logger.info("Reconstructing families")
for family_file in glob.glob(join(party_dir, '*_detections.csv')):
Logger.debug(
f"Working on family file {family_file.split('/')[-1]}")
template = [
t for t in tribe if _templates_match(t, family_file)]
family = Family(template=template[0] or Template())
Expand All @@ -824,8 +863,8 @@ def read(self, filename=None, read_detection_catalog=True,
f.template.name == family.template.name][0]
new_family = False
family.detections += _read_family(
fname=family_file, all_cat=all_cat, template=template[0],
estimate_origin=estimate_origin)
fname=family_file, all_cat_dict=all_cat_dict,
template=template[0], estimate_origin=estimate_origin)
if new_family:
families.append(family)
shutil.rmtree(temp_dir)
Expand Down Expand Up @@ -1090,7 +1129,8 @@ def min_chans(self, min_chans):
return self


def read_party(fname=None, read_detection_catalog=True, *args, **kwargs):
def read_party(fname=None, read_detection_catalog=True, max_processes=1,
*args, **kwargs):
"""
Read detections and metadata from a tar archive.

Expand All @@ -1102,15 +1142,29 @@ def read_party(fname=None, read_detection_catalog=True, *args, **kwargs):
:param read_detection_catalog:
Whether to read the detection catalog or not, if False, catalog
will be regenerated - for large catalogs this can be faster.
:type max_processes: int
:param max_processes:
Maximum number of processes to use for reading files in parallel.
Defaults to single-threaded.

:return: :class:`eqcorrscan.core.match_filter.Party`
"""
party = Party()
party.read(filename=fname, read_detection_catalog=read_detection_catalog,
*args, **kwargs)
max_processes=max_processes, *args, **kwargs)
return party


def _read_catalog_pass_error(filename):
Logger.debug(f"Reading {filename}")
try:
cat = read_events(filename)
except TypeError as e:
Logger.error(e)
cat = Catalog()
return cat


if __name__ == "__main__":
import doctest

Expand Down
Loading