Skip to content

Commit

Permalink
Merge pull request #73 from duncanmmacleod/hdf5
Browse files Browse the repository at this point in the history
Added support for handling HDF5 files created by Omicron.exe
  • Loading branch information
Duncan Macleod committed Nov 5, 2018
2 parents 5a9fd0b + b828c5c commit 37edde5
Show file tree
Hide file tree
Showing 7 changed files with 183 additions and 31 deletions.
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ script:
- python -m coverage run -m pytest omicron/
- python -m coverage run -a $(which omicron-process) --help
- python -m coverage run -a $(which omicron-status) --help
- python -m coverage run -a $(which omicron-hdf5-merge) --help

after_success:
- python -m pip install coveralls
Expand Down
46 changes: 46 additions & 0 deletions bin/omicron-hdf5-merge
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (C) Duncan Macleod (2018)
#
# This file is part of LIGO-Omicron.
#
# LIGO-Omicron is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# LIGO-Omicron is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with LIGO-Omicron. If not, see <http://www.gnu.org/licenses/>.

"""Merge HDF5 files into one
"""

import os.path
import argparse

from omicron import (io, __version__)

__author__ = 'Duncan Macleod <duncan.macleod@ligo.org>'

parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument('-V', '--version', action='version', version=__version__)
parser.add_argument('filename', nargs='+', help='file to merge')
parser.add_argument('output', help='output file name')
parser.add_argument('-d', '--remove-input', action='store_true', default=False,
help='remove input files after writing output, '
'default: %(default)s')

args = parser.parse_args()
io.merge_hdf5_files(args.filename, args.output)

# remove input files
if args.remove_input:
for f in args.filename:
if os.path.samefile(f, args.output):
continue
os.remove(f)
70 changes: 45 additions & 25 deletions bin/omicron-process
Original file line number Diff line number Diff line change
Expand Up @@ -223,13 +223,17 @@ pipeg.add_argument('--skip-omicron', action='store_true', default=False,
pipeg.add_argument('--skip-root-merge', action='store_true', default=False,
help='skip running omicron-root-merge, '
'default: %(default)s')
pipeg.add_argument('--skip-hdf5-merge', action='store_true', default=False,
help='skip running omicron-hdf5-merge, '
'default: %(default)s')
pipeg.add_argument('--skip-ligolw_add', action='store_true', default=False,
help='skip running ligolw_add, default: %(default)s')
pipeg.add_argument('--skip-gzip', action='store_true', default=False,
help='skip running gzip, default: %(default)s')
pipeg.add_argument('--skip-postprocessing', action='store_true', default=False,
help='skip all post-processing, equivalent to '
'--skip-root-merge --skip-lioglw_add --skip-gzip, '
'--skip-root-merge --skip-hdf5-merge '
'--skip-ligolw_add --skip-gzip, '
'default: %(default)s')

args = parser.parse_args()
Expand All @@ -247,12 +251,13 @@ if args.executable is None:
"--executable on the command line")

# validate processing options
if all((args.skip_root_merge, args.skip_ligolw_add, args.skip_gzip,
not args.archive)):
if all((args.skip_root_merge, args.skip_hdf5_merge, args.skip_ligolw_add,
args.skip_gzip, not args.archive)):
args.skip_postprocessing = True
if args.archive:
argsd = vars(args)
for arg in ['skip-root-merge', 'skip-ligolw-add', 'skip-gzip']:
for arg in ['skip-root-merge', 'skip-hdf5-merge',
'skip-ligolw-add', 'skip-gzip']:
if argsd[arg.replace('-', '_')]:
parser.error("Cannot use --%s with --archive" % arg)

Expand Down Expand Up @@ -717,6 +722,7 @@ ppjob.add_condor_cmd('+OmicronPostProcess', '"%s"' % group)
ppjob.add_short_opt('e', '')
ppnodes = []
rootmerge = utils.which('omicron-root-merge')
hdf5merge = utils.which('omicron-hdf5-merge')
ligolw_add = utils.which('ligolw_add')
gzip = utils.which('gzip')

Expand Down Expand Up @@ -819,6 +825,28 @@ for s, e in segs:
archivefiles[target] = [root]
rmfiles.append(root)

# add HDF5 operations
if 'hdf5' in fileformats:
hdf5files = ' '.join(omicronfiles[c]['hdf5'])
for f in omicronfiles[c]['hdf5']:
ppnode._CondorDAGNode__input_files.append(f)
if args.skip_hdf5_merge or (
len(omicronfiles[c]['hdf5']) == 1):
hdf5 = hdf5files
else:
hdf5 = os.path.join(
chandir, filename.replace('root', 'h5'))
operations.append('{cmd} {infiles} {outfile}'.format(
cmd=hdf5merge, infiles=hdf5files, outfile=hdf5))
rmfiles.append(hdf5files)
ppnode._CondorDAGNode__output_files.append(hdf5)
if args.archive:
try:
archivefiles[target].append(hdf5)
except KeyError:
archivefiles[target] = [hdf5]
rmfiles.append(hdf5)

# add LIGO_LW operations
if 'xml' in fileformats:
xmlfiles = ' '.join(omicronfiles[c]['xml'])
Expand Down Expand Up @@ -895,9 +923,7 @@ if omicronv >= 'v2r2':
# do all archiving last, once all post-processing has completed
if args.archive:
archivenode = pipeline.CondorDAGNode(archivejob)
xmlcache = Cache()
rootcache = Cache()
txtcache = Cache()
acache = {fmt: Cache() for fmt in fileformats}
if newdag:
# write shell script to seed archive
with open(archivejob.get_executable(), 'w') as f:
Expand All @@ -913,26 +939,20 @@ if args.archive:
x in filelist]
for fn in filenames:
archivenode._CondorDAGNode__output_files.append(fn)
xmlcache.extend(CacheEntry.from_T050017(x) for x in filenames
if x.endswith(('xml.gz', 'xml')))
rootcache.extend(CacheEntry.from_T050017(x) for x in filenames
if x.endswith('root'))
txtcache.extend(CacheEntry.from_T050017(x) for x in filenames
if x.endswith('txt'))
for fmt, extensions in {
'xml': ('xml.gz', '.xml'),
'root': 'root',
'hdf5': 'hdf5',
'txt': 'txt',
}.items():
cache[fmt].extend(map(CacheEntry.from_T050017, filter(
lambda x: x.endswith(extensions), filenames)))
os.chmod(archivejob.get_executable(), 0o755)
# write caches to disk
if 'xml' in fileformats:
xmlcachefile = os.path.join(cachedir, 'omicron-xml.lcf')
data.write_cache(xmlcache, xmlcachefile)
logger.debug("XML cache written to %s" % xmlcachefile)
if 'root' in fileformats:
rootcachefile = os.path.join(cachedir, 'omicron-root.lcf')
data.write_cache(rootcache, rootcachefile)
logger.debug("ROOT cache written to %s" % rootcachefile)
if 'txt' in fileformats:
txtcachefile = os.path.join(cachedir, 'omicron-txt.lcf')
data.write_cache(txtcache, txtcachefile)
logger.debug("ASCII cache written to %s" % txtcachefile)
for fmt, fcache in acache.items():
cachefile = os.path.join(cachedir, 'omicron-{0}.lcf'.format(fmt))
data.write_cache(fcache, cachefile)
logger.debug("{0} cache written to {1}".format(fmt, cachefile))
# add node to DAG
for node in ppnodes:
archivenode.add_parent(node)
Expand Down
77 changes: 77 additions & 0 deletions omicron/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
import os.path
import glob
import re
from collections import defaultdict

import numpy

from glue.lal import Cache

Expand Down Expand Up @@ -224,3 +227,77 @@ def get_archive_filename(channel, start, duration, ext='xml.gz',
else:
gps5 = str(int(start))[:5]
return os.path.join(archive, ifo, description, gps5, filename)


def merge_hdf5_files(inputfiles, outputfile, **compression_kw):
"""Merge several HDF5 files into a single file
Parameters
----------
inputfile : `list` of `str`
the paths of the input HDF5 files to merge
outputfile : `str`
the path of the output HDF5 file to write
tree : `list` of `str`
the names of the HDF5 Trees to include
strict : `bool`, default: `True`
only combine contiguous files (as described by the contained segmenets)
on_missing : `str`, optional
what to do when an input file is not found, one of
- ``'ignore'``: do nothing
- ``'warn'``: print a warning
- ``'raise'``: raise an `IOError`
Notes
-----
This method requires the `HDF5 <https://root.cern.ch/pyroot>`_ package.
"""
import h5py

# get list of datasets
attributes = {}
datasets = {}
for path in inputfiles:
with h5py.File(path, 'r') as h5f:
attributes = dict(h5f.attrs)
for dset in h5f:
# assert datatype is the same, and append shape
shape = h5f[dset].shape
dtype = h5f[dset].dtype
try:
shape = numpy.sum(datasets[dset][0] + shape, keepdims=True)
except KeyError:
chunk = shape
else:
assert dtype == datasets[dset][1], (
"Cannot merge {0}/{1}, does not match dtype".format(
path, dset))
if chunk != datasets[dset][2]:
chunk = True
datasets[dset] = (shape, dtype, chunk)

# use default compression options from this file
for copt in ('compression', 'compression_opts'):
compression_kw.setdefault(copt, getattr(h5f[dset], copt))

# combine sets
position = defaultdict(int)
with h5py.File(outputfile, 'w') as h5out:
# copy attributes (just from last file)
h5out.attrs.update(attributes)
# create datasets
for dset, (shape, dtype, chunk) in datasets.items():
h5out.create_dataset(dset, shape=shape, dtype=dtype,
chunks=chunk, **compression_kw)
# copy dataset contents
for path in inputfiles:
with h5py.File(path, 'r') as h5in:
for dset in datasets:
data = h5in[dset]
size = data.shape[0]
pos = position[dset]
h5out[dset][pos:pos+size] = data
position[dset] += size

return outputfile
18 changes: 12 additions & 6 deletions omicron/parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ class OmicronParameters(configparser.ConfigParser):
OMICRON_DEFAULTS['v2r2'] = {
'PARAMETER': {'FFTPLAN': 'FFTW_ESTIMATE'},
}
OMICRON_DEFAULTS['v2r3'] = {
'OUTPUT': {'FORMAT': 'root xml hdf5'},
}

def __init__(self, version=None, defaults=dict(), **kwargs):
configparser.ConfigParser.__init__(self, defaults=defaults, **kwargs)
Expand Down Expand Up @@ -460,10 +463,13 @@ def output_files(self, start, end, flatten=False):
segments = self.output_segments(start, end)

# parse list of file formats
fileformats = []
for form in ['root', 'txt', 'xml']:
if form in self.get('OUTPUT', 'FORMAT'):
fileformats.append(form)
fileformats = {
form: ext for (form, ext) in {
'root': 'root',
'txt': 'txt',
'xml': 'xml',
'hdf5': 'h5',
}.items() if form in self.get('OUTPUT', 'FORMAT')}

# build list of files
outdir = self.get('OUTPUT', 'DIRECTORY')
Expand All @@ -473,9 +479,9 @@ def output_files(self, start, end, flatten=False):
out[channel] = dict((form, []) for form in fileformats)
for seg in segments:
basename = '%s_OMICRON-%d-%d' % (cstr, seg[0], abs(seg))
for form in fileformats:
for form, ext in fileformats.items():
out[channel][form].append(os.path.join(
outdir, channel, '%s.%s' % (basename, form)))
outdir, channel, '{0}.{1}'.format(basename, ext)))
if flatten: # return a basic list of filenames
return [f for c in out for form in out[c] for f in out[c][form]]
return out
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ lalsuite
dqsegdb
gwpy
htcondor
h5py
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
'dqsegdb',
'gwpy',
'python-ligo-lw >= 1.4.0',
'h5py',
]
tests_require = [
'pytest',
Expand Down

0 comments on commit 37edde5

Please sign in to comment.