Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support for handling HDF5 files created by Omicron.exe #73

Merged
merged 1 commit into from
Nov 5, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ script:
- python -m coverage run -m pytest omicron/
- python -m coverage run -a $(which omicron-process) --help
- python -m coverage run -a $(which omicron-status) --help
- python -m coverage run -a $(which omicron-hdf5-merge) --help

after_success:
- python -m pip install coveralls
Expand Down
46 changes: 46 additions & 0 deletions bin/omicron-hdf5-merge
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (C) Duncan Macleod (2018)
#
# This file is part of LIGO-Omicron.
#
# LIGO-Omicron is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# LIGO-Omicron is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with LIGO-Omicron. If not, see <http://www.gnu.org/licenses/>.

"""Merge HDF5 files into one
"""

import os.path
import argparse

from omicron import (io, __version__)

__author__ = 'Duncan Macleod <duncan.macleod@ligo.org>'

parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument('-V', '--version', action='version', version=__version__)
parser.add_argument('filename', nargs='+', help='file to merge')
parser.add_argument('output', help='output file name')
parser.add_argument('-d', '--remove-input', action='store_true', default=False,
help='remove input files after writing output, '
'default: %(default)s')

args = parser.parse_args()
io.merge_hdf5_files(args.filename, args.output)

# remove input files
if args.remove_input:
for f in args.filename:
if os.path.samefile(f, args.output):
continue
os.remove(f)
70 changes: 45 additions & 25 deletions bin/omicron-process
Original file line number Diff line number Diff line change
Expand Up @@ -223,13 +223,17 @@ pipeg.add_argument('--skip-omicron', action='store_true', default=False,
pipeg.add_argument('--skip-root-merge', action='store_true', default=False,
help='skip running omicron-root-merge, '
'default: %(default)s')
pipeg.add_argument('--skip-hdf5-merge', action='store_true', default=False,
help='skip running omicron-hdf5-merge, '
'default: %(default)s')
pipeg.add_argument('--skip-ligolw_add', action='store_true', default=False,
help='skip running ligolw_add, default: %(default)s')
pipeg.add_argument('--skip-gzip', action='store_true', default=False,
help='skip running gzip, default: %(default)s')
pipeg.add_argument('--skip-postprocessing', action='store_true', default=False,
help='skip all post-processing, equivalent to '
'--skip-root-merge --skip-lioglw_add --skip-gzip, '
'--skip-root-merge --skip-hdf5-merge '
'--skip-ligolw_add --skip-gzip, '
'default: %(default)s')

args = parser.parse_args()
Expand All @@ -247,12 +251,13 @@ if args.executable is None:
"--executable on the command line")

# validate processing options
if all((args.skip_root_merge, args.skip_ligolw_add, args.skip_gzip,
not args.archive)):
if all((args.skip_root_merge, args.skip_hdf5_merge, args.skip_ligolw_add,
args.skip_gzip, not args.archive)):
args.skip_postprocessing = True
if args.archive:
argsd = vars(args)
for arg in ['skip-root-merge', 'skip-ligolw-add', 'skip-gzip']:
for arg in ['skip-root-merge', 'skip-hdf5-merge',
'skip-ligolw-add', 'skip-gzip']:
if argsd[arg.replace('-', '_')]:
parser.error("Cannot use --%s with --archive" % arg)

Expand Down Expand Up @@ -717,6 +722,7 @@ ppjob.add_condor_cmd('+OmicronPostProcess', '"%s"' % group)
ppjob.add_short_opt('e', '')
ppnodes = []
rootmerge = utils.which('omicron-root-merge')
hdf5merge = utils.which('omicron-hdf5-merge')
ligolw_add = utils.which('ligolw_add')
gzip = utils.which('gzip')

Expand Down Expand Up @@ -819,6 +825,28 @@ for s, e in segs:
archivefiles[target] = [root]
rmfiles.append(root)

# add HDF5 operations
if 'hdf5' in fileformats:
hdf5files = ' '.join(omicronfiles[c]['hdf5'])
for f in omicronfiles[c]['hdf5']:
ppnode._CondorDAGNode__input_files.append(f)
if args.skip_hdf5_merge or (
len(omicronfiles[c]['hdf5']) == 1):
hdf5 = hdf5files
else:
hdf5 = os.path.join(
chandir, filename.replace('root', 'h5'))
operations.append('{cmd} {infiles} {outfile}'.format(
cmd=hdf5merge, infiles=hdf5files, outfile=hdf5))
rmfiles.append(hdf5files)
ppnode._CondorDAGNode__output_files.append(hdf5)
if args.archive:
try:
archivefiles[target].append(hdf5)
except KeyError:
archivefiles[target] = [hdf5]
rmfiles.append(hdf5)

# add LIGO_LW operations
if 'xml' in fileformats:
xmlfiles = ' '.join(omicronfiles[c]['xml'])
Expand Down Expand Up @@ -895,9 +923,7 @@ if omicronv >= 'v2r2':
# do all archiving last, once all post-processing has completed
if args.archive:
archivenode = pipeline.CondorDAGNode(archivejob)
xmlcache = Cache()
rootcache = Cache()
txtcache = Cache()
acache = {fmt: Cache() for fmt in fileformats}
if newdag:
# write shell script to seed archive
with open(archivejob.get_executable(), 'w') as f:
Expand All @@ -913,26 +939,20 @@ if args.archive:
x in filelist]
for fn in filenames:
archivenode._CondorDAGNode__output_files.append(fn)
xmlcache.extend(CacheEntry.from_T050017(x) for x in filenames
if x.endswith(('xml.gz', 'xml')))
rootcache.extend(CacheEntry.from_T050017(x) for x in filenames
if x.endswith('root'))
txtcache.extend(CacheEntry.from_T050017(x) for x in filenames
if x.endswith('txt'))
for fmt, extensions in {
'xml': ('xml.gz', '.xml'),
'root': 'root',
'hdf5': 'hdf5',
'txt': 'txt',
}.items():
cache[fmt].extend(map(CacheEntry.from_T050017, filter(
lambda x: x.endswith(extensions), filenames)))
os.chmod(archivejob.get_executable(), 0o755)
# write caches to disk
if 'xml' in fileformats:
xmlcachefile = os.path.join(cachedir, 'omicron-xml.lcf')
data.write_cache(xmlcache, xmlcachefile)
logger.debug("XML cache written to %s" % xmlcachefile)
if 'root' in fileformats:
rootcachefile = os.path.join(cachedir, 'omicron-root.lcf')
data.write_cache(rootcache, rootcachefile)
logger.debug("ROOT cache written to %s" % rootcachefile)
if 'txt' in fileformats:
txtcachefile = os.path.join(cachedir, 'omicron-txt.lcf')
data.write_cache(txtcache, txtcachefile)
logger.debug("ASCII cache written to %s" % txtcachefile)
for fmt, fcache in acache.items():
cachefile = os.path.join(cachedir, 'omicron-{0}.lcf'.format(fmt))
data.write_cache(fcache, cachefile)
logger.debug("{0} cache written to {1}".format(fmt, cachefile))
# add node to DAG
for node in ppnodes:
archivenode.add_parent(node)
Expand Down
77 changes: 77 additions & 0 deletions omicron/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
import os.path
import glob
import re
from collections import defaultdict

import numpy

from glue.lal import Cache

Expand Down Expand Up @@ -224,3 +227,77 @@ def get_archive_filename(channel, start, duration, ext='xml.gz',
else:
gps5 = str(int(start))[:5]
return os.path.join(archive, ifo, description, gps5, filename)


def merge_hdf5_files(inputfiles, outputfile, **compression_kw):
"""Merge several HDF5 files into a single file

Parameters
----------
inputfile : `list` of `str`
the paths of the input HDF5 files to merge
outputfile : `str`
the path of the output HDF5 file to write
tree : `list` of `str`
the names of the HDF5 Trees to include
strict : `bool`, default: `True`
only combine contiguous files (as described by the contained segmenets)
on_missing : `str`, optional
what to do when an input file is not found, one of

- ``'ignore'``: do nothing
- ``'warn'``: print a warning
- ``'raise'``: raise an `IOError`

Notes
-----
This method requires the `HDF5 <https://root.cern.ch/pyroot>`_ package.
"""
import h5py

# get list of datasets
attributes = {}
datasets = {}
for path in inputfiles:
with h5py.File(path, 'r') as h5f:
attributes = dict(h5f.attrs)
for dset in h5f:
# assert datatype is the same, and append shape
shape = h5f[dset].shape
dtype = h5f[dset].dtype
try:
shape = numpy.sum(datasets[dset][0] + shape, keepdims=True)
except KeyError:
chunk = shape
else:
assert dtype == datasets[dset][1], (
"Cannot merge {0}/{1}, does not match dtype".format(
path, dset))
if chunk != datasets[dset][2]:
chunk = True
datasets[dset] = (shape, dtype, chunk)

# use default compression options from this file
for copt in ('compression', 'compression_opts'):
compression_kw.setdefault(copt, getattr(h5f[dset], copt))

# combine sets
position = defaultdict(int)
with h5py.File(outputfile, 'w') as h5out:
# copy attributes (just from last file)
h5out.attrs.update(attributes)
# create datasets
for dset, (shape, dtype, chunk) in datasets.items():
h5out.create_dataset(dset, shape=shape, dtype=dtype,
chunks=chunk, **compression_kw)
# copy dataset contents
for path in inputfiles:
with h5py.File(path, 'r') as h5in:
for dset in datasets:
data = h5in[dset]
size = data.shape[0]
pos = position[dset]
h5out[dset][pos:pos+size] = data
position[dset] += size

return outputfile
18 changes: 12 additions & 6 deletions omicron/parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ class OmicronParameters(configparser.ConfigParser):
OMICRON_DEFAULTS['v2r2'] = {
'PARAMETER': {'FFTPLAN': 'FFTW_ESTIMATE'},
}
OMICRON_DEFAULTS['v2r3'] = {
'OUTPUT': {'FORMAT': 'root xml hdf5'},
}

def __init__(self, version=None, defaults=dict(), **kwargs):
configparser.ConfigParser.__init__(self, defaults=defaults, **kwargs)
Expand Down Expand Up @@ -460,10 +463,13 @@ def output_files(self, start, end, flatten=False):
segments = self.output_segments(start, end)

# parse list of file formats
fileformats = []
for form in ['root', 'txt', 'xml']:
if form in self.get('OUTPUT', 'FORMAT'):
fileformats.append(form)
fileformats = {
form: ext for (form, ext) in {
'root': 'root',
'txt': 'txt',
'xml': 'xml',
'hdf5': 'h5',
}.items() if form in self.get('OUTPUT', 'FORMAT')}

# build list of files
outdir = self.get('OUTPUT', 'DIRECTORY')
Expand All @@ -473,9 +479,9 @@ def output_files(self, start, end, flatten=False):
out[channel] = dict((form, []) for form in fileformats)
for seg in segments:
basename = '%s_OMICRON-%d-%d' % (cstr, seg[0], abs(seg))
for form in fileformats:
for form, ext in fileformats.items():
out[channel][form].append(os.path.join(
outdir, channel, '%s.%s' % (basename, form)))
outdir, channel, '{0}.{1}'.format(basename, ext)))
if flatten: # return a basic list of filenames
return [f for c in out for form in out[c] for f in out[c][form]]
return out
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ lalsuite
dqsegdb
gwpy
htcondor
h5py
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
'dqsegdb',
'gwpy',
'python-ligo-lw >= 1.4.0',
'h5py',
]
tests_require = [
'pytest',
Expand Down