Skip to content

Commit

Permalink
Merge pull request #87 from duncanmmacleod/guardian-segments
Browse files Browse the repository at this point in the history
Added support for parsing segments from guardian states
  • Loading branch information
Duncan Macleod committed Mar 5, 2019
2 parents e6a30d8 + 7b1e564 commit b8d8ded
Show file tree
Hide file tree
Showing 5 changed files with 184 additions and 22 deletions.
24 changes: 19 additions & 5 deletions bin/omicron-process
Original file line number Diff line number Diff line change
Expand Up @@ -525,7 +525,7 @@ minduration = 1 * chunkdur
# validate span is long enough
if dataduration < minduration and online:
logger.info("Segment is too short (%d < %d), please try again later"
% (duration, chunkdur - padding * 2))
% (duration, minduration))
clean_exit(0, tempfiles)
elif dataduration < minduration:
raise ValueError("Segment [%d, %d) is too short (%d < %d), please "
Expand All @@ -537,9 +537,23 @@ elif dataduration < minduration:
if (online and statechannel) or (statechannel and not stateflag) or (
statechannel and args.no_segdb):
logger.info("Finding segments for relevant state...")
segs = segments.get_state_segments(statechannel, stateft,
datastart, dataend, bits=statebits,
pad=statepad)
if statebits == "guardian": # use guardian
segs = segments.get_guardian_segments(
statechannel,
stateft,
datastart,
dataend,
pad=statepad,
)
else:
segs = segments.get_state_segments(
statechannel,
stateft,
datastart,
dataend,
bits=statebits,
pad=statepad,
)
# get segments from segment database
elif stateflag:
logger.info("Querying segments for relevant state...")
Expand Down Expand Up @@ -610,7 +624,7 @@ elif args.use_dev_shm and len(segs): # only cache if we have state segments
cache = data.find_frames(ifo, frametype, datastart, dataend,
on_gaps='warn', tmpdir=cachedir)
# remove cached files at end of process
tempfiles.extend(filter(lambda p: cachedir in p, cache.pfnlist()))
tempfiles.extend(filter(lambda p: cachedir in p, cache))
# find frames using datafind
else:
cache = data.find_frames(ifo, frametype, datastart, dataend,
Expand Down
92 changes: 77 additions & 15 deletions omicron/segments.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,21 +30,20 @@
from functools import wraps

from glue.lal import Cache
from ligo.segments.utils import fromsegwizard
from ligo.segments import (segmentlist as SegmentList, segment as Segment)

from dqsegdb2.query import DEFAULT_SEGMENT_SERVER
from dqsegdb2.http import request as dqsegdb2_request

from gwpy.io.cache import (cache_segments as _cache_segments, file_segment)
from gwpy.segments import DataQualityFlag
from gwpy.timeseries import StateVector
from gwpy.segments import (DataQualityFlag, SegmentList)
from gwpy.timeseries import (StateTimeSeries, StateVector, TimeSeriesDict)

from . import (const, data, utils)

STATE_CHANNEL = {
'H1:DMT-GRD_ISC_LOCK_NOMINAL:1': ('H1:GRD-ISC_LOCK_OK', [0], 'H1_R'),
'L1:DMT-GRD_ISC_LOCK_NOMINAL:1': ('L1:GRD-ISC_LOCK_OK', [0], 'L1_R'),
'H1:DMT-GRD_ISC_LOCK_NOMINAL:1': ('H1:GRD-ISC_LOCK', "guardian", 'H1_R'),
'L1:DMT-GRD_ISC_LOCK_NOMINAL:1': ('L1:GRD-ISC_LOCK', "guardian", 'L1_R'),
'H1:DMT-UP:1': ('H1:GDS-CALIB_STATE_VECTOR', [2], 'H1_HOFT_C00'),
'L1:DMT-UP:1': ('L1:GDS-CALIB_STATE_VECTOR', [2], 'L1_HOFT_C00'),
'H1:DMT-CALIBRATED:1': ('H1:GDS-CALIB_STATE_VECTOR', [0], 'H1_HOFT_C00'),
Expand All @@ -53,7 +52,7 @@
'H1_HOFT_C00'),
'L1:DMT-ANALYSIS_READY:1': ('L1:GDS-CALIB_STATE_VECTOR', [0, 1, 2],
'L1_HOFT_C00'),
'V1:ITF_LOCKED:1': ('V1:DQ_ANALYSIS_STATE_VECTOR', [2], 'V1_llhoft'),
'V1:ITF_LOCKED:1': ('V1:DQ_ANALYSIS_STATE_VECTOR', [11], 'V1_llhoft'),
'V1:ITF_SCIENCE:1': ('V1:DQ_ANALYSIS_STATE_VECTOR', [0, 1, 2], 'V1_llhoft'),
}
RAW_TYPE_REGEX = re.compile('[A-Z]1_R')
Expand All @@ -71,29 +70,34 @@ def decorated_method(*args, **kwargs):


def read_segments(filename, coltype=int):
with open(filename, 'r') as fp:
return fromsegwizard(fp, coltype=coltype)
return SegmentList.read(
filename,
gpstype=coltype,
format="segwizard",
)


def get_last_run_segment(segfile):
return read_segments(segfile, coltype=int)[-1]


def write_segments(segmentlist, outfile, coltype=int):
with open(outfile, 'w') as fp:
for seg in segmentlist:
print('%d %d' % seg, file=fp)
return SegmentList(segmentlist).write(
outfile,
coltype=coltype,
format="segwizard",
)


@integer_segments
def query_state_segments(flag, start, end, url=DEFAULT_SEGMENT_SERVER,
pad=(0, 0)):
"""Query a segment database for active segments associated with a flag
"""
segs = DataQualityFlag.query(flag, start-pad[0], end+pad[1], url=url).pad(
pad[0], -pad[1]) # DQF.pad pads forward in time at end
segs.coalesce()
return segs.active
# NOTE: DQF.pad pads forward in time at end
return DataQualityFlag.query(
flag, start-pad[0], end+pad[1], url=url,
).coalesce().pad(pad[0], -pad[1]).active


@integer_segments
Expand Down Expand Up @@ -157,6 +161,64 @@ def get_frame_segments(obs, frametype, start, end):
return cache_segments(cache) & span


@integer_segments
def get_guardian_segments(node, frametype, start, end, nproc=1, pad=(0, 0),
strict=False):
"""Determine state segments for a given guardian node
"""
ifo, node = node.split(':', 1)
if node.startswith('GRD-'):
node = node[4:]
pstart = start - pad[0]
pend = end + pad[1]

# find frame cache
cache = data.find_frames(ifo, frametype, pstart, pend)

# pre-format data segments
span = SegmentList([Segment(pstart, pend)])
segs = SegmentList()
csegs = cache_segments(cache)
if not csegs:
return csegs

# read data
stub = "{}:GRD-{}".format(ifo, node)
if strict:
channels = ["{}_OK".format(stub)]
else:
state = "{}_STATE_N".format(stub)
nominal = "{}_NOMINAL_N".format(stub)
active = "{}_ACTIVE".format(stub)
channels = [state, nominal, active]
for seg in csegs & span:
if strict:
sv = StateVector.read(
cache, channels[0], nproc=nproc, start=seg[0], end=seg[1],
bits=[0], gap='pad', pad=0,).astype('uint32')
segs += sv.to_dqflags().intersection().active
else:
gdata = TimeSeriesDict.read(
cache, channels, nproc=nproc, start=seg[0], end=seg[1],
gap='pad', pad=0)
ok = ((gdata[state].value == gdata[nominal].value) &
(gdata[active].value == 1)).view(StateTimeSeries)
ok.t0 = gdata[state].t0
ok.dt = gdata[state].dt
segs += ok.to_dqflag().active

# truncate to integers, and apply padding
for i, seg in enumerate(segs):
segs[i] = type(seg)(int(ceil(seg[0])) + pad[0],
int(floor(seg[1])) - pad[1])
segs.coalesce()

# clean up and return
if data.re_ll.match(frametype):
shutil.rmtree(tmpdir)
return segs.coalesce()


@integer_segments
def cache_segments(cache):
return _cache_segments(cache).coalesce()
Expand Down
86 changes: 86 additions & 0 deletions omicron/tests/test_segments.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
# -*- coding: utf-8 -*-
# Copyright (C) Duncan Macleod (2019)
#
# This file is part of PyOmicron.
#
# PyOmicron is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# PyOmicron is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with PyOmicron. If not, see <http://www.gnu.org/licenses/>.

"""Tests for omicron.segments
"""

import tempfile
from copy import deepcopy
try:
from unittest import mock
from io import StringIO
except ImportError: # python < 3
from StringIO import StringIO
import mock

import pytest

from gwpy.segments import (DataQualityFlag, Segment, SegmentList)

from .. import segments


@pytest.fixture
def seglist():
return SegmentList([
Segment(0, 1),
Segment(1, 2),
Segment(3, 4),
])


def test_read_write_segments(seglist):
with tempfile.NamedTemporaryFile(mode="w") as tmp:
segments.write_segments(seglist, tmp)
tmp.seek(0)
segs = segments.read_segments(tmp.name)
assert segs == seglist


def test_get_last_run_segment(seglist):
tmp = StringIO()
segments.write_segments(seglist, tmp)
tmp.seek(0)
assert segments.get_last_run_segment(tmp) == seglist[-1]


def test_query_state_segments(seglist):
with mock.patch(
"omicron.segments.DataQualityFlag.query",
return_value=DataQualityFlag(active=deepcopy(seglist),
known=[seglist.extent()]),
):
coal = deepcopy(seglist).coalesce()
assert segments.query_state_segments('X', 0, 10) == coal
assert segments.query_state_segments(
'X', 0, 10,
pad=(1, 1),
) == DataQualityFlag(active=coal, known=[coal.extent()]).pad(1, -1).active


@mock.patch(
"omicron.data.find_frames",
return_value=["/path/to/A-B-0-10.gwf", "/path/to/C-D-20-10.gwf"],
)
def test_get_frame_segments(find):
assert segments.get_frame_segments("X", "X1_R", 0, 100) == SegmentList([
Segment(0, 10), Segment(20, 30),
])
assert segments.get_frame_segments("X", "X1_R", 25, 100) == SegmentList([
Segment(25, 30),
])
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ python-ligo-lw >= 1.4.0
ligo-segments
lalsuite
dqsegdb2
gwpy >=0.13.0
gwpy >=0.14.0
htcondor
h5py
gwdatafind
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
'htcondor',
'lalsuite',
'dqsegdb2',
'gwpy >= 0.13.0',
'gwpy >= 0.14.0',
'python-ligo-lw >= 1.4.0',
'h5py',
'gwdatafind',
Expand Down

0 comments on commit b8d8ded

Please sign in to comment.