Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support for parsing segments from guardian states #87

Merged
merged 7 commits into from
Mar 5, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 19 additions & 5 deletions bin/omicron-process
Original file line number Diff line number Diff line change
Expand Up @@ -525,7 +525,7 @@ minduration = 1 * chunkdur
# validate span is long enough
if dataduration < minduration and online:
logger.info("Segment is too short (%d < %d), please try again later"
% (duration, chunkdur - padding * 2))
% (duration, minduration))
clean_exit(0, tempfiles)
elif dataduration < minduration:
raise ValueError("Segment [%d, %d) is too short (%d < %d), please "
Expand All @@ -537,9 +537,23 @@ elif dataduration < minduration:
if (online and statechannel) or (statechannel and not stateflag) or (
statechannel and args.no_segdb):
logger.info("Finding segments for relevant state...")
segs = segments.get_state_segments(statechannel, stateft,
datastart, dataend, bits=statebits,
pad=statepad)
if statebits == "guardian": # use guardian
segs = segments.get_guardian_segments(
statechannel,
stateft,
datastart,
dataend,
pad=statepad,
)
else:
segs = segments.get_state_segments(
statechannel,
stateft,
datastart,
dataend,
bits=statebits,
pad=statepad,
)
# get segments from segment database
elif stateflag:
logger.info("Querying segments for relevant state...")
Expand Down Expand Up @@ -610,7 +624,7 @@ elif args.use_dev_shm and len(segs): # only cache if we have state segments
cache = data.find_frames(ifo, frametype, datastart, dataend,
on_gaps='warn', tmpdir=cachedir)
# remove cached files at end of process
tempfiles.extend(filter(lambda p: cachedir in p, cache.pfnlist()))
tempfiles.extend(filter(lambda p: cachedir in p, cache))
# find frames using datafind
else:
cache = data.find_frames(ifo, frametype, datastart, dataend,
Expand Down
92 changes: 77 additions & 15 deletions omicron/segments.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,21 +30,20 @@
from functools import wraps

from glue.lal import Cache
from ligo.segments.utils import fromsegwizard
from ligo.segments import (segmentlist as SegmentList, segment as Segment)

from dqsegdb2.query import DEFAULT_SEGMENT_SERVER
from dqsegdb2.http import request as dqsegdb2_request

from gwpy.io.cache import (cache_segments as _cache_segments, file_segment)
from gwpy.segments import DataQualityFlag
from gwpy.timeseries import StateVector
from gwpy.segments import (DataQualityFlag, SegmentList)
from gwpy.timeseries import (StateTimeSeries, StateVector, TimeSeriesDict)

from . import (const, data, utils)

STATE_CHANNEL = {
'H1:DMT-GRD_ISC_LOCK_NOMINAL:1': ('H1:GRD-ISC_LOCK_OK', [0], 'H1_R'),
'L1:DMT-GRD_ISC_LOCK_NOMINAL:1': ('L1:GRD-ISC_LOCK_OK', [0], 'L1_R'),
'H1:DMT-GRD_ISC_LOCK_NOMINAL:1': ('H1:GRD-ISC_LOCK', "guardian", 'H1_R'),
'L1:DMT-GRD_ISC_LOCK_NOMINAL:1': ('L1:GRD-ISC_LOCK', "guardian", 'L1_R'),
'H1:DMT-UP:1': ('H1:GDS-CALIB_STATE_VECTOR', [2], 'H1_HOFT_C00'),
'L1:DMT-UP:1': ('L1:GDS-CALIB_STATE_VECTOR', [2], 'L1_HOFT_C00'),
'H1:DMT-CALIBRATED:1': ('H1:GDS-CALIB_STATE_VECTOR', [0], 'H1_HOFT_C00'),
Expand All @@ -53,7 +52,7 @@
'H1_HOFT_C00'),
'L1:DMT-ANALYSIS_READY:1': ('L1:GDS-CALIB_STATE_VECTOR', [0, 1, 2],
'L1_HOFT_C00'),
'V1:ITF_LOCKED:1': ('V1:DQ_ANALYSIS_STATE_VECTOR', [2], 'V1_llhoft'),
'V1:ITF_LOCKED:1': ('V1:DQ_ANALYSIS_STATE_VECTOR', [11], 'V1_llhoft'),
'V1:ITF_SCIENCE:1': ('V1:DQ_ANALYSIS_STATE_VECTOR', [0, 1, 2], 'V1_llhoft'),
}
RAW_TYPE_REGEX = re.compile('[A-Z]1_R')
Expand All @@ -71,29 +70,34 @@ def decorated_method(*args, **kwargs):


def read_segments(filename, coltype=int):
with open(filename, 'r') as fp:
return fromsegwizard(fp, coltype=coltype)
return SegmentList.read(
filename,
gpstype=coltype,
format="segwizard",
)


def get_last_run_segment(segfile):
return read_segments(segfile, coltype=int)[-1]


def write_segments(segmentlist, outfile, coltype=int):
with open(outfile, 'w') as fp:
for seg in segmentlist:
print('%d %d' % seg, file=fp)
return SegmentList(segmentlist).write(
outfile,
coltype=coltype,
format="segwizard",
)


@integer_segments
def query_state_segments(flag, start, end, url=DEFAULT_SEGMENT_SERVER,
pad=(0, 0)):
"""Query a segment database for active segments associated with a flag
"""
segs = DataQualityFlag.query(flag, start-pad[0], end+pad[1], url=url).pad(
pad[0], -pad[1]) # DQF.pad pads forward in time at end
segs.coalesce()
return segs.active
# NOTE: DQF.pad pads forward in time at end
return DataQualityFlag.query(
flag, start-pad[0], end+pad[1], url=url,
).coalesce().pad(pad[0], -pad[1]).active


@integer_segments
Expand Down Expand Up @@ -157,6 +161,64 @@ def get_frame_segments(obs, frametype, start, end):
return cache_segments(cache) & span


@integer_segments
def get_guardian_segments(node, frametype, start, end, nproc=1, pad=(0, 0),
strict=False):
"""Determine state segments for a given guardian node
"""
ifo, node = node.split(':', 1)
if node.startswith('GRD-'):
node = node[4:]
pstart = start - pad[0]
pend = end + pad[1]

# find frame cache
cache = data.find_frames(ifo, frametype, pstart, pend)

# pre-format data segments
span = SegmentList([Segment(pstart, pend)])
segs = SegmentList()
csegs = cache_segments(cache)
if not csegs:
return csegs

# read data
stub = "{}:GRD-{}".format(ifo, node)
if strict:
channels = ["{}_OK".format(stub)]
else:
state = "{}_STATE_N".format(stub)
nominal = "{}_NOMINAL_N".format(stub)
active = "{}_ACTIVE".format(stub)
channels = [state, nominal, active]
for seg in csegs & span:
if strict:
sv = StateVector.read(
cache, channels[0], nproc=nproc, start=seg[0], end=seg[1],
bits=[0], gap='pad', pad=0,).astype('uint32')
segs += sv.to_dqflags().intersection().active
else:
gdata = TimeSeriesDict.read(
cache, channels, nproc=nproc, start=seg[0], end=seg[1],
gap='pad', pad=0)
ok = ((gdata[state].value == gdata[nominal].value) &
(gdata[active].value == 1)).view(StateTimeSeries)
ok.t0 = gdata[state].t0
ok.dt = gdata[state].dt
segs += ok.to_dqflag().active

# truncate to integers, and apply padding
for i, seg in enumerate(segs):
segs[i] = type(seg)(int(ceil(seg[0])) + pad[0],
int(floor(seg[1])) - pad[1])
segs.coalesce()

# clean up and return
if data.re_ll.match(frametype):
shutil.rmtree(tmpdir)
return segs.coalesce()


@integer_segments
def cache_segments(cache):
return _cache_segments(cache).coalesce()
Expand Down
86 changes: 86 additions & 0 deletions omicron/tests/test_segments.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
# -*- coding: utf-8 -*-
# Copyright (C) Duncan Macleod (2019)
#
# This file is part of PyOmicron.
#
# PyOmicron is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# PyOmicron is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with PyOmicron. If not, see <http://www.gnu.org/licenses/>.

"""Tests for omicron.segments
"""

import tempfile
from copy import deepcopy
try:
from unittest import mock
from io import StringIO
except ImportError: # python < 3
from StringIO import StringIO
import mock

import pytest

from gwpy.segments import (DataQualityFlag, Segment, SegmentList)

from .. import segments


@pytest.fixture
def seglist():
return SegmentList([
Segment(0, 1),
Segment(1, 2),
Segment(3, 4),
])


def test_read_write_segments(seglist):
with tempfile.NamedTemporaryFile(mode="w") as tmp:
segments.write_segments(seglist, tmp)
tmp.seek(0)
segs = segments.read_segments(tmp.name)
assert segs == seglist


def test_get_last_run_segment(seglist):
tmp = StringIO()
segments.write_segments(seglist, tmp)
tmp.seek(0)
assert segments.get_last_run_segment(tmp) == seglist[-1]


def test_query_state_segments(seglist):
with mock.patch(
"omicron.segments.DataQualityFlag.query",
return_value=DataQualityFlag(active=deepcopy(seglist),
known=[seglist.extent()]),
):
coal = deepcopy(seglist).coalesce()
assert segments.query_state_segments('X', 0, 10) == coal
assert segments.query_state_segments(
'X', 0, 10,
pad=(1, 1),
) == DataQualityFlag(active=coal, known=[coal.extent()]).pad(1, -1).active


@mock.patch(
"omicron.data.find_frames",
return_value=["/path/to/A-B-0-10.gwf", "/path/to/C-D-20-10.gwf"],
)
def test_get_frame_segments(find):
assert segments.get_frame_segments("X", "X1_R", 0, 100) == SegmentList([
Segment(0, 10), Segment(20, 30),
])
assert segments.get_frame_segments("X", "X1_R", 25, 100) == SegmentList([
Segment(25, 30),
])
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ python-ligo-lw >= 1.4.0
ligo-segments
lalsuite
dqsegdb2
gwpy >=0.13.0
gwpy >=0.14.0
htcondor
h5py
gwdatafind
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
'htcondor',
'lalsuite',
'dqsegdb2',
'gwpy >= 0.13.0',
'gwpy >= 0.14.0',
'python-ligo-lw >= 1.4.0',
'h5py',
'gwdatafind',
Expand Down