Skip to content

Commit

Permalink
Merge pull request #63 from duncanmmacleod/o2-replay
Browse files Browse the repository at this point in the history
Improved data access for shared-memory files
  • Loading branch information
Duncan Macleod committed Jun 18, 2018
2 parents 846a5c8 + fbe2592 commit f6b9e77
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 103 deletions.
44 changes: 10 additions & 34 deletions bin/omicron-process
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,6 @@ if args.gps is None and args.cache_file is not None:

# extract key variables
ifo = args.ifo
llhoft = data.ligo_low_latency_hoft_type(ifo, use_devshm=args.use_dev_shm)
group = args.group
online = args.gps is None

Expand Down Expand Up @@ -478,19 +477,17 @@ segfile = os.path.join(rundir, 'segments.txt')
keepfiles.append(segfile)

if newdag and online:
if frametype == '%s_HOFT_C00' % ifo:
end = data.get_latest_data_gps(ifo, llhoft)
else:
end = data.get_latest_data_gps(ifo, frametype)
end -= (chunkdur + padding)
try:
# get limit of available data (allowing for padding)
end = data.get_latest_data_gps(ifo, frametype) - padding

try: # start from where we got to last time
start = segments.get_last_run_segment(segfile)[1]
except IOError:
if llhoft.endswith('llhoft'):
except IOError: # otherwise start with a sensible amount of data
if args.use_dev_shm: # process one chunk
logger.debug("No online segment record, starting with "
"%s seconds" % chunkdur)
start = end - chunkdur + padding
else:
else: # process the last 4000 seconds (arbitrarily)
logger.debug("No online segment record, starting with "
"4000 seconds")
start = end - 4000
Expand All @@ -514,28 +511,6 @@ logger.info("Duration = %d seconds" % dataduration)

span = (start, end)

# -- double-check frametype for h(t)
# don't use aggregated h(t) if running online

if not args.cache_file:
try:
data.check_data_availability(ifo, '%s_HOFT_C00' % ifo, start, end)
except RuntimeError:
use_online_hoft = True
msg = "Gaps found in %s availability, turning to %s" % (
frametype, llhoft)
else:
use_online_hoft = False

if frametype == '%s_HOFT_C00' % ifo and use_online_hoft:
if online:
logger.debug(msg)
else:
logger.warning(msg)
frametype = llhoft
if statechannel and use_online_hoft and stateft == '%s_HOFT_C00' % ifo:
stateft = llhoft

# -- find segments and frame files --------------------------------------------

# minimum allowed duration is one full chunk
Expand Down Expand Up @@ -626,10 +601,11 @@ dataspan = type(segs)([segments.Segment(datastart, dataend)])
if args.cache_file:
with open(args.cache_file, 'r') as fobj:
cache = Cache.fromfile(fobj)
elif args.use_dev_shm:
elif args.use_dev_shm and len(segs): # only cache if we have state segments
cache = data.find_frames(ifo, frametype, datastart, dataend,
on_gaps='warn', tmpdir=cachedir)
tempfiles.extend(cache.pfnlist())
# remove cached files at end of process
tempfiles.extend(filter(lambda p: cachedir in p, cache.pfnlist()))
# find frames using datafind
else:
cache = data.find_frames(ifo, frametype, datastart, dataend,
Expand Down
154 changes: 90 additions & 64 deletions omicron/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from glue.lal import (Cache, CacheEntry)
from glue.segments import (segment as Segment, segmentlist as SegmentList)

re_ll = re.compile('\A[HL]1_ll')
re_ll = re.compile('_(llhoft|lldetchar)\Z')
re_gwf_gps_epoch = re.compile('[-\/](?P<gpsepoch>\d+)$')


Expand Down Expand Up @@ -103,25 +103,27 @@ def get_latest_data_gps(obs, frametype, connection=None):
gpstime : `int`
the GPS time marking the end of the latest frame
"""
if re_ll.match(frametype):
latest = _find_ll_frames(obs, frametype)[-1]
try:
latest = connection.find_latest(obs[0], frametype, urltype='file',
on_missing='error')[0]
try:
ngps = len(re_gwf_gps_epoch.search(
os.path.dirname(latest.path)).groupdict()['gpsepoch'])
except AttributeError:
pass
if re_ll.search(frametype):
latest = _find_ll_frames(obs, frametype)[-1]
else:
while True:
s, e = latest.segment
new = latest.path.replace('-%d-' % s, '-%d-' % e)
new = new.replace('%s/' % str(s)[:ngps], '%s/' % str(e)[:ngps])
if os.path.isfile(new):
latest = CacheEntry.from_T050017(new)
else:
break
latest = connection.find_latest(obs[0], frametype, urltype='file',
on_missing='error')[0]
try:
ngps = len(re_gwf_gps_epoch.search(
os.path.dirname(latest.path)).groupdict()['gpsepoch'])
except AttributeError:
pass
else:
while True:
s, e = latest.segment
new = latest.path.replace('-%d-' % s, '-%d-' % e)
new = new.replace('%s/' % str(s)[:ngps],
'%s/' % str(e)[:ngps])
if os.path.isfile(new):
latest = CacheEntry.from_T050017(new)
else:
break
except IndexError as e:
e.args = ('No %s-%s frames found' % (obs[0], frametype),)
raise
Expand Down Expand Up @@ -174,8 +176,8 @@ def check_data_availability(obs, frametype, start, end, connection=None):
connection.find_frame_urls(obs[0], frametype, start, end, on_gaps='error')


@with_datafind_connection
def find_frames(obs, frametype, start, end, connection=None, **kwargs):
def find_frames(obs, frametype, start, end, connection=None, on_gaps='warn',
**kwargs):
"""Find all frames for the given frametype in the GPS interval
Parameters
Expand All @@ -199,46 +201,78 @@ def find_frames(obs, frametype, start, end, connection=None, **kwargs):
cache : `~glue.lal.Cache`
a cache of frame file locations
"""
if re_ll.match(frametype):
return find_ll_frames(obs, frametype, start, end, **kwargs)
else:
kwargs.setdefault('urltype', 'file')
cache = connection.find_frame_urls(obs[0], frametype, start, end,
**kwargs)
# use latest frame to find more recent frames that aren't in
# datafind yet, this is quite hacky, and isn't guaranteed to
# work at any point, but it shouldn't break anything
try:
latest = cache[-1]
except IndexError: # no frames
return cache
ll_kw = {key: kwargs.pop(key) for key in ('tmpdir', 'root',) if
key in kwargs}

# find files using datafind
cache = _find_frames_datafind(obs, frametype, start, end,
connection=connection, on_gaps='ignore',
**kwargs)

# find more files for low-latency under /dev/shm (or similar)
if re_ll.search(frametype):
try:
ngps = len(re_gwf_gps_epoch.search(
os.path.dirname(latest.path)).groupdict()['gpsepoch'])
except AttributeError: # no match
pass
latest = cache[-1].segment[1]
except IndexError:
latest = start
if latest < end:
cache.extend(find_ll_frames(obs, frametype, latest, end, **ll_kw))

# handle missing files
if on_gaps != 'ignore':
seglist = SegmentList(e.segment for e in cache).coalesce()
missing = (SegmentList([Segment(start, end)]) - seglist).coalesce()
msg = "Missing segments:\n%s" % '\n'.join(map(str, missing))
if missing and on_gaps == 'warn':
warnings.warn(msg)
elif missing:
raise RuntimeError(msg)

return cache


@with_datafind_connection
def _find_frames_datafind(obs, frametype, start, end, connection=None,
**kwargs):
kwargs.setdefault('urltype', 'file')
cache = connection.find_frame_urls(obs[0], frametype, start, end,
**kwargs)

# use latest frame to find more recent frames that aren't in
# datafind yet, this is quite hacky, and isn't guaranteed to
# work at any point, but it shouldn't break anything

try:
latest = cache[-1]
except IndexError: # no frames
return cache

try: # find number of GPS digits in paths
ngps = len(re_gwf_gps_epoch.search(
os.path.dirname(latest.path)).groupdict()['gpsepoch'])
except AttributeError: # no match
return cache

while True:
s, e = latest.segment
if s >= end: # dont' go beyond requested times
break
new = latest.path.replace('-%d-' % s, '-%d-' % e)
new = new.replace('%s/' % str(s)[:ngps], '%s/' % str(e)[:ngps])
if os.path.isfile(new):
latest = CacheEntry.from_T050017(new)
cache.append(latest)
else:
while True:
s, e = latest.segment
if s >= end: # dont' go beyond requested times
break
new = latest.path.replace('-%d-' % s, '-%d-' % e)
new = new.replace('%s/' % str(s)[:ngps], '%s/' % str(e)[:ngps])
if os.path.isfile(new):
latest = CacheEntry.from_T050017(new)
cache.append(latest)
else:
break
return cache
break
return cache


def write_cache(cache, outfile):
with open(outfile, 'w') as fp:
cache.tofile(fp)


def find_ll_frames(ifo, frametype, start, end, root='/dev/shm',
on_gaps='warn', tmpdir=None):
def find_ll_frames(ifo, frametype, start, end, root='/dev/shm', tmpdir=None):
"""Find all buffered low-latency frames in the given interval
Parameters
Expand Down Expand Up @@ -277,14 +311,6 @@ def find_ll_frames(ifo, frametype, start, end, root='/dev/shm',
"""
seg = Segment(start, end)
cache = _find_ll_frames(ifo, frametype, root=root).sieve(segment=seg)
if on_gaps != 'ignore':
seglist = SegmentList(e.segment for e in cache).coalesce()
missing = (SegmentList([seg]) - seglist).coalesce()
msg = "Missing segments:\n%s" % '\n'.join(map(str, missing))
if missing and on_gaps == 'warn':
warnings.warn(msg)
elif missing:
raise RuntimeError(msg)
if tmpdir:
out = []
if not os.path.isdir(tmpdir):
Expand All @@ -298,11 +324,11 @@ def find_ll_frames(ifo, frametype, start, end, root='/dev/shm',
return cache


def _find_ll_frames(ifo, frametype, root='/dev/shm'):
if frametype.startswith('%s_' % ifo):
frametype = frametype.split('_', 1)[1]
def _find_ll_frames(ifo, frametype, root='/dev/shm', ext='gwf'):
obs = ifo[0]
globstr = os.path.join(root, frametype, ifo,
'%s-%s_%s-*-*.gwf' % (obs, ifo, frametype))
bits = frametype.rsplit('_', 1)
basedir = os.path.join(root, *bits[::-1])
globstr = os.path.join(basedir, '{obs}-{frametype}-*-*.{ext}'.format(
obs=obs, frametype=frametype, ext=ext))
# don't return the last file, as it might not have been fully written yet
return Cache.from_urls(sorted(glob.glob(globstr)[:-1]))
6 changes: 1 addition & 5 deletions omicron/segments.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,7 @@ def get_state_segments(channel, frametype, start, end, bits=[0], nproc=1,
pend = end + pad[1]

# find frame cache
if data.re_ll.match(frametype):
tmpdir = mkdtemp(prefix='tmp-pyomicron-')
cache = data.find_frames(ifo, frametype, pstart, pend, tmpdir=tmpdir)
else:
cache = data.find_frames(ifo, frametype, pstart, pend)
cache = data.find_frames(ifo, frametype, pstart, pend)

# optimise I/O based on type and library
io_kw = {}
Expand Down

0 comments on commit f6b9e77

Please sign in to comment.