Skip to content

Commit

Permalink
Condor checkpointing for omega scans (#284)
Browse files Browse the repository at this point in the history
* Introduce a tool that allows loading properties of the loudest time-frequency tile from a previously-stored record
* Full condor checkpointing for omega scans
* Unit tests for new config method
* Add --ignore-checkpoint to gwdetchar-omega-batch
* gwdetchar-omega-batch: calls to logger aren't compatible with pycondor, fall back to print statements
* Refactor and add a unit test
  • Loading branch information
Alex L. Urban committed Mar 29, 2019
1 parent 0e45fe5 commit 409a456
Show file tree
Hide file tree
Showing 6 changed files with 140 additions and 18 deletions.
46 changes: 38 additions & 8 deletions bin/gwdetchar-omega
Expand Up @@ -51,6 +51,7 @@ import sys
import warnings
import numpy

from gwpy.table import Table
from gwpy.time import to_gps

from matplotlib import use
Expand Down Expand Up @@ -80,6 +81,9 @@ parser.add_argument('-f', '--config-file', action='append', default=None,
parser.add_argument('-d', '--disable-correlation', action='store_true',
default=False, help='disable cross-correlation of aux '
'channels, default: False')
parser.add_argument('-D', '--disable-checkpoint', action='store_true',
default=False, help='disable checkpointing from previous '
'runs, default: False')
parser.add_argument('-s', '--ignore-state-flags', action='store_true',
default=False, help='ignore state flag definitions in '
'the configuration, default: False')
Expand Down Expand Up @@ -153,7 +157,7 @@ outdir = os.path.abspath(outdir)
if not os.path.isdir(outdir):
os.makedirs(outdir)
os.chdir(outdir)
logger.debug("Output directory created as {}".format(outdir))
logger.debug('Output directory created as {}'.format(outdir))


# -- Compute Qscan ------------------------------------------------------------
Expand All @@ -166,6 +170,23 @@ for d in [plotdir, aboutdir, datadir]:
if not os.path.isdir(d):
os.makedirs(d)

# determine checkpoints
summary = os.path.join(datadir, 'summary.csv')
if os.path.exists(summary) and not args.disable_checkpoint:
logger.debug('Checkpointing from {}'.format(
os.path.abspath(summary)))
record = Table.read(summary)
completed = list(record['Channel'])
if not args.disable_correlation and ('Standard Deviation'
not in record.colnames):
raise KeyError(
'Cross-correlation is not available from this record, '
'consider running without correlation or starting from '
'scratch with --disable-checkpoint')
else:
record = []
completed = []

# set up html output
logger.debug('Setting up HTML at {}/index.html'.format(outdir))
html.write_qscan_page(ifo, gps, analyzed, **htmlv)
Expand Down Expand Up @@ -217,9 +238,21 @@ for block in blocks.values():
chans, start, end, frametype=block.frametype, source=block.source,
nproc=args.nproc, verbose='Reading block:'.rjust(30))

# scan individual channels
# process individual channels
for channel in block.channels:
try:
if channel.name in completed: # load checkpoint
logger.info(' -- Checkpointing {} from a previous '
'run'.format(channel.name))
cindex = completed.index(channel.name)
channel.load_loudest_tile_features(
record[cindex], correlated=correlate is not None)
analyzed = html.update_toc(analyzed, channel,
name=blocks[channel.section].name)
htmlv['toc'] = analyzed
html.write_qscan_page(ifo, gps, analyzed, **htmlv)
continue

try: # scan the channel
logger.info(' -- Scanning channel {}'.format(channel.name))
series = omega.scan(
gps, channel, data[channel.name].astype('float64'), fftlength,
Expand Down Expand Up @@ -248,11 +281,8 @@ for block in blocks.values():
else:
channel.save_loudest_tile_features(series[3])

try: # update analyzed dict
analyzed[channel.section]['channels'].append(channel)
except KeyError:
analyzed[channel.section] = {'name': blocks[channel.section].name,
'channels': [channel]}
analyzed = html.update_toc(analyzed, channel,
name=blocks[channel.section].name)
htmlv['toc'] = analyzed
html.write_qscan_page(ifo, gps, analyzed, **htmlv)

Expand Down
14 changes: 8 additions & 6 deletions bin/gwdetchar-omega-batch
Expand Up @@ -67,6 +67,9 @@ parser.add_argument('--colormap', default='viridis',
parser.add_argument('-d', '--disable-correlation', action='store_true',
default=False, help='disable cross-correlation of aux '
'channels, default: False')
parser.add_argument('-D', '--disable-checkpoint', action='store_true',
default=False, help='disable checkpointing from previous '
'runs, default: False')
parser.add_argument('-t', '--far-threshold', type=float, default=3.171e-8,
help='white noise false alarm rate threshold (Hz) for '
'processing channels, default: %(default)s')
Expand Down Expand Up @@ -103,9 +106,6 @@ cargs.add_argument('--condor-command', action='append', default=[],

args = parser.parse_args()

# set up logger
logger = cli.logger(name=os.path.basename(__file__))

# set up output directory
outdir = os.path.abspath(os.path.expanduser(args.output_dir))

Expand Down Expand Up @@ -189,6 +189,8 @@ arguments.extend(("--far-threshold", str(args.far_threshold)))
arguments.extend(("--nproc", str(args.nproc)))
if args.disable_correlation:
arguments.append("--disable-correlation")
if args.disable_checkpoint:
arguments.append("--disable-checkpoint")
if args.ignore_state_flags:
arguments.append("--ignore-state-flags")

Expand All @@ -200,17 +202,17 @@ for t in times:

# write DAG
dagman.build(fancyname=False)
logger.info("Workflow generated for {} times".format(len(times)))
print("Workflow generated for {} times".format(len(times)))
if args.submit:
dagman.submit_dag(submit_options="-force")
else:
logger.info(
print(
"Submit to condor via:\n\n"
"$ condor_submit_dag {0.submit_file}".format(dagman),
)

if args.submit and args.monitor:
logger.info("Monitoring progress of {0.submit_file}".format(dagman))
print("Monitoring progress of {0.submit_file}".format(dagman))
try:
subprocess.check_call(
["pycondor", "monitor", dagman.submit_file],
Expand Down
32 changes: 32 additions & 0 deletions gwdetchar/omega/config.py
Expand Up @@ -322,6 +322,38 @@ def save_loudest_tile_features(self, qgram, correlate=None, gps=0, dt=0.1):
delay = (corr.t0 + corr.argmax() * corr.dt).value - gps
self.delay = int(delay * 1000) # convert to ms

def load_loudest_tile_features(self, table, correlated=False):
"""Load properties of the loudest time-frequency tile from a table
Parameters
----------
table : `~gwpy.table.Table`
table of properties of the loudest time-frequency tile
correlated : `bool`, optional
boolean switch to determine if cross-correlation properties are
included, default: `False`
Notes
-----
Attributes stored in-place include `Q`, `energy`, `snr`, `t`, and `f`,
all corresponding to the columns contained in `table`.
If `correlated` is not `None` then the maximum correlation amplitude,
relative time delay, and standard deviation are stored as attributes
`corr`, `delay`, and `stdev`, respectively.
"""
# save parameters
self.Q = table['Q']
self.energy = table['Energy']
self.snr = table['SNR']
self.t = table['Central Time']
self.f = table['Central Frequency (Hz)']
if correlated:
self.corr = table['Correlation']
self.stdev = table['Standard Deviation']
self.delay = table['Delay (ms)']


class OmegaChannelList(object):
"""A conceptual list of `OmegaChannel` objects with common signal
Expand Down
36 changes: 33 additions & 3 deletions gwdetchar/omega/html.py
Expand Up @@ -110,6 +110,35 @@

# -- HTML construction --------------------------------------------------------

def update_toc(toc, channel, name='GW'):
"""Add a channel to the page table of contents
Parameters
----------
toc : `dict`
dictionary used as table of contents for a bootstrap page
channel : `OmegaChannel`
channel to be added to `toc`
name : `str`, optional
name of a channel's block, default: `'GW'`
Returns
-------
out : `dict`
the updated dictionary
"""
out = toc
try: # update analyzed dict
out[channel.section]['channels'].append(channel)
except KeyError:
out[channel.section] = {
'name': name,
'channels': [channel]}
return out


def navbar(ifo, gpstime, toc={}):
"""Initialise a new `markup.page`
This method constructs an HTML page with the following structure
Expand Down Expand Up @@ -314,7 +343,7 @@ def write_summary_table(blocks, correlated, base=os.path.curdir):
# record summary data for each channel
channel, time, freq, Q, energy, snr = ([], [], [], [], [], [])
if correlated:
corr, delay = ([], [])
corr, stdev, delay = ([], [], [])
for block in blocks.values():
for chan in block['channels']:
channel.append(chan.name)
Expand All @@ -325,13 +354,14 @@ def write_summary_table(blocks, correlated, base=os.path.curdir):
snr.append(chan.snr)
if correlated:
corr.append(chan.corr)
stdev.append(chan.stdev)
delay.append(chan.delay)
# store in a table
if correlated:
data = Table([channel, time, freq, Q, energy, snr, corr, delay],
data = Table([channel, time, freq, Q, energy, snr, corr, stdev, delay],
names=('Channel', 'Central Time',
'Central Frequency (Hz)', 'Q', 'Energy', 'SNR',
'Correlation', 'Delay (ms)'))
'Correlation', 'Standard Deviation', 'Delay (ms)'))
else:
data = Table([channel, time, freq, Q, energy, snr], names=(
'Channel', 'Central Time', 'Central Frequency (Hz)', 'Q', 'Energy',
Expand Down
19 changes: 19 additions & 0 deletions gwdetchar/omega/tests/test_config.py
Expand Up @@ -29,6 +29,7 @@
import numpy
from scipy import signal

from gwpy.table import Table
from gwpy.timeseries import TimeSeries

from .. import (config, core)
Expand Down Expand Up @@ -77,6 +78,11 @@
PRIMARY = BLOCKS['primary']
GW = BLOCKS['GW']

ROWS = [[0]] * 8
COLS = ('Q', 'Energy', 'SNR', 'Central Time', 'Central Frequency (Hz)',
'Correlation', 'Standard Deviation', 'Delay (ms)')
TABLE = Table(ROWS, names=COLS)


# -- test utilities -----------------------------------------------------------

Expand Down Expand Up @@ -202,3 +208,16 @@ def test_save_loudest_tile_features():
assert channel.corr == numpy.around(glitch.max().value, 1)
assert channel.delay == 0.0
assert channel.stdev == glitch.std().value


def test_load_loudest_tile_features():
channel = GW.channels[0]
channel.load_loudest_tile_features(TABLE, correlated=True)
assert channel.Q == 0
assert channel.energy == 0
assert channel.snr == 0
assert channel.t == 0
assert channel.f == 0
assert channel.corr == 0
assert channel.delay == 0
assert channel.stdev == 0
11 changes: 10 additions & 1 deletion gwdetchar/omega/tests/test_html.py
Expand Up @@ -100,7 +100,7 @@
[GW]
; name of this block, which contains h(t)
name = Gravitational Wave Strain
name = Gravitational-Wave Strain
q-range = 3.3166,150.0
frequency-range = 4.0,1024
resample = 4096
Expand Down Expand Up @@ -229,6 +229,15 @@

# -- unit tests ---------------------------------------------------------------

def test_update_toc():
toc = html.update_toc(dict(), GW.channels[0], GW.name)
assert toc.keys() == ANALYZED.keys()
assert toc['GW'].keys() == ANALYZED['GW'].keys()
assert len(toc['GW']['channels']) == len(ANALYZED['GW']['channels'])
assert toc['GW']['channels'][0].name == ANALYZED['GW']['channels'][0].name
assert toc['GW']['name'] == ANALYZED['GW']['name']


def test_navbar():
page = html.navbar('L1', 0, toc=ANALYZED)
assert parse_html(str(page)) == parse_html(HTML_HEADER.format(
Expand Down

0 comments on commit 409a456

Please sign in to comment.