Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improved data access for shared-memory files #63

Merged
merged 5 commits into from
Jun 18, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 10 additions & 34 deletions bin/omicron-process
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,6 @@ if args.gps is None and args.cache_file is not None:

# extract key variables
ifo = args.ifo
llhoft = data.ligo_low_latency_hoft_type(ifo, use_devshm=args.use_dev_shm)
group = args.group
online = args.gps is None

Expand Down Expand Up @@ -478,19 +477,17 @@ segfile = os.path.join(rundir, 'segments.txt')
keepfiles.append(segfile)

if newdag and online:
if frametype == '%s_HOFT_C00' % ifo:
end = data.get_latest_data_gps(ifo, llhoft)
else:
end = data.get_latest_data_gps(ifo, frametype)
end -= (chunkdur + padding)
try:
# get limit of available data (allowing for padding)
end = data.get_latest_data_gps(ifo, frametype) - padding

try: # start from where we got to last time
start = segments.get_last_run_segment(segfile)[1]
except IOError:
if llhoft.endswith('llhoft'):
except IOError: # otherwise start with a sensible amount of data
if args.use_dev_shm: # process one chunk
logger.debug("No online segment record, starting with "
"%s seconds" % chunkdur)
start = end - chunkdur + padding
else:
else: # process the last 4000 seconds (arbitrarily)
logger.debug("No online segment record, starting with "
"4000 seconds")
start = end - 4000
Expand All @@ -514,28 +511,6 @@ logger.info("Duration = %d seconds" % dataduration)

span = (start, end)

# -- double-check frametype for h(t)
# don't use aggregated h(t) if running online

if not args.cache_file:
try:
data.check_data_availability(ifo, '%s_HOFT_C00' % ifo, start, end)
except RuntimeError:
use_online_hoft = True
msg = "Gaps found in %s availability, turning to %s" % (
frametype, llhoft)
else:
use_online_hoft = False

if frametype == '%s_HOFT_C00' % ifo and use_online_hoft:
if online:
logger.debug(msg)
else:
logger.warning(msg)
frametype = llhoft
if statechannel and use_online_hoft and stateft == '%s_HOFT_C00' % ifo:
stateft = llhoft

# -- find segments and frame files --------------------------------------------

# minimum allowed duration is one full chunk
Expand Down Expand Up @@ -626,10 +601,11 @@ dataspan = type(segs)([segments.Segment(datastart, dataend)])
if args.cache_file:
with open(args.cache_file, 'r') as fobj:
cache = Cache.fromfile(fobj)
elif args.use_dev_shm:
elif args.use_dev_shm and len(segs): # only cache if we have state segments
cache = data.find_frames(ifo, frametype, datastart, dataend,
on_gaps='warn', tmpdir=cachedir)
tempfiles.extend(cache.pfnlist())
# remove cached files at end of process
tempfiles.extend(filter(lambda p: cachedir in p, cache.pfnlist()))
# find frames using datafind
else:
cache = data.find_frames(ifo, frametype, datastart, dataend,
Expand Down
154 changes: 90 additions & 64 deletions omicron/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from glue.lal import (Cache, CacheEntry)
from glue.segments import (segment as Segment, segmentlist as SegmentList)

re_ll = re.compile('\A[HL]1_ll')
re_ll = re.compile('_(llhoft|lldetchar)\Z')
re_gwf_gps_epoch = re.compile('[-\/](?P<gpsepoch>\d+)$')


Expand Down Expand Up @@ -103,25 +103,27 @@ def get_latest_data_gps(obs, frametype, connection=None):
gpstime : `int`
the GPS time marking the end of the latest frame
"""
if re_ll.match(frametype):
latest = _find_ll_frames(obs, frametype)[-1]
try:
latest = connection.find_latest(obs[0], frametype, urltype='file',
on_missing='error')[0]
try:
ngps = len(re_gwf_gps_epoch.search(
os.path.dirname(latest.path)).groupdict()['gpsepoch'])
except AttributeError:
pass
if re_ll.search(frametype):
latest = _find_ll_frames(obs, frametype)[-1]
else:
while True:
s, e = latest.segment
new = latest.path.replace('-%d-' % s, '-%d-' % e)
new = new.replace('%s/' % str(s)[:ngps], '%s/' % str(e)[:ngps])
if os.path.isfile(new):
latest = CacheEntry.from_T050017(new)
else:
break
latest = connection.find_latest(obs[0], frametype, urltype='file',
on_missing='error')[0]
try:
ngps = len(re_gwf_gps_epoch.search(
os.path.dirname(latest.path)).groupdict()['gpsepoch'])
except AttributeError:
pass
else:
while True:
s, e = latest.segment
new = latest.path.replace('-%d-' % s, '-%d-' % e)
new = new.replace('%s/' % str(s)[:ngps],
'%s/' % str(e)[:ngps])
if os.path.isfile(new):
latest = CacheEntry.from_T050017(new)
else:
break
except IndexError as e:
e.args = ('No %s-%s frames found' % (obs[0], frametype),)
raise
Expand Down Expand Up @@ -174,8 +176,8 @@ def check_data_availability(obs, frametype, start, end, connection=None):
connection.find_frame_urls(obs[0], frametype, start, end, on_gaps='error')


@with_datafind_connection
def find_frames(obs, frametype, start, end, connection=None, **kwargs):
def find_frames(obs, frametype, start, end, connection=None, on_gaps='warn',
**kwargs):
"""Find all frames for the given frametype in the GPS interval

Parameters
Expand All @@ -199,46 +201,78 @@ def find_frames(obs, frametype, start, end, connection=None, **kwargs):
cache : `~glue.lal.Cache`
a cache of frame file locations
"""
if re_ll.match(frametype):
return find_ll_frames(obs, frametype, start, end, **kwargs)
else:
kwargs.setdefault('urltype', 'file')
cache = connection.find_frame_urls(obs[0], frametype, start, end,
**kwargs)
# use latest frame to find more recent frames that aren't in
# datafind yet, this is quite hacky, and isn't guaranteed to
# work at any point, but it shouldn't break anything
try:
latest = cache[-1]
except IndexError: # no frames
return cache
ll_kw = {key: kwargs.pop(key) for key in ('tmpdir', 'root',) if
key in kwargs}

# find files using datafind
cache = _find_frames_datafind(obs, frametype, start, end,
connection=connection, on_gaps='ignore',
**kwargs)

# find more files for low-latency under /dev/shm (or similar)
if re_ll.search(frametype):
try:
ngps = len(re_gwf_gps_epoch.search(
os.path.dirname(latest.path)).groupdict()['gpsepoch'])
except AttributeError: # no match
pass
latest = cache[-1].segment[1]
except IndexError:
latest = start
if latest < end:
cache.extend(find_ll_frames(obs, frametype, latest, end, **ll_kw))

# handle missing files
if on_gaps != 'ignore':
seglist = SegmentList(e.segment for e in cache).coalesce()
missing = (SegmentList([Segment(start, end)]) - seglist).coalesce()
msg = "Missing segments:\n%s" % '\n'.join(map(str, missing))
if missing and on_gaps == 'warn':
warnings.warn(msg)
elif missing:
raise RuntimeError(msg)

return cache


@with_datafind_connection
def _find_frames_datafind(obs, frametype, start, end, connection=None,
**kwargs):
kwargs.setdefault('urltype', 'file')
cache = connection.find_frame_urls(obs[0], frametype, start, end,
**kwargs)

# use latest frame to find more recent frames that aren't in
# datafind yet, this is quite hacky, and isn't guaranteed to
# work at any point, but it shouldn't break anything

try:
latest = cache[-1]
except IndexError: # no frames
return cache

try: # find number of GPS digits in paths
ngps = len(re_gwf_gps_epoch.search(
os.path.dirname(latest.path)).groupdict()['gpsepoch'])
except AttributeError: # no match
return cache

while True:
s, e = latest.segment
if s >= end: # dont' go beyond requested times
break
new = latest.path.replace('-%d-' % s, '-%d-' % e)
new = new.replace('%s/' % str(s)[:ngps], '%s/' % str(e)[:ngps])
if os.path.isfile(new):
latest = CacheEntry.from_T050017(new)
cache.append(latest)
else:
while True:
s, e = latest.segment
if s >= end: # dont' go beyond requested times
break
new = latest.path.replace('-%d-' % s, '-%d-' % e)
new = new.replace('%s/' % str(s)[:ngps], '%s/' % str(e)[:ngps])
if os.path.isfile(new):
latest = CacheEntry.from_T050017(new)
cache.append(latest)
else:
break
return cache
break
return cache


def write_cache(cache, outfile):
with open(outfile, 'w') as fp:
cache.tofile(fp)


def find_ll_frames(ifo, frametype, start, end, root='/dev/shm',
on_gaps='warn', tmpdir=None):
def find_ll_frames(ifo, frametype, start, end, root='/dev/shm', tmpdir=None):
"""Find all buffered low-latency frames in the given interval

Parameters
Expand Down Expand Up @@ -277,14 +311,6 @@ def find_ll_frames(ifo, frametype, start, end, root='/dev/shm',
"""
seg = Segment(start, end)
cache = _find_ll_frames(ifo, frametype, root=root).sieve(segment=seg)
if on_gaps != 'ignore':
seglist = SegmentList(e.segment for e in cache).coalesce()
missing = (SegmentList([seg]) - seglist).coalesce()
msg = "Missing segments:\n%s" % '\n'.join(map(str, missing))
if missing and on_gaps == 'warn':
warnings.warn(msg)
elif missing:
raise RuntimeError(msg)
if tmpdir:
out = []
if not os.path.isdir(tmpdir):
Expand All @@ -298,11 +324,11 @@ def find_ll_frames(ifo, frametype, start, end, root='/dev/shm',
return cache


def _find_ll_frames(ifo, frametype, root='/dev/shm'):
if frametype.startswith('%s_' % ifo):
frametype = frametype.split('_', 1)[1]
def _find_ll_frames(ifo, frametype, root='/dev/shm', ext='gwf'):
obs = ifo[0]
globstr = os.path.join(root, frametype, ifo,
'%s-%s_%s-*-*.gwf' % (obs, ifo, frametype))
bits = frametype.rsplit('_', 1)
basedir = os.path.join(root, *bits[::-1])
globstr = os.path.join(basedir, '{obs}-{frametype}-*-*.{ext}'.format(
obs=obs, frametype=frametype, ext=ext))
# don't return the last file, as it might not have been fully written yet
return Cache.from_urls(sorted(glob.glob(globstr)[:-1]))
6 changes: 1 addition & 5 deletions omicron/segments.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,7 @@ def get_state_segments(channel, frametype, start, end, bits=[0], nproc=1,
pend = end + pad[1]

# find frame cache
if data.re_ll.match(frametype):
tmpdir = mkdtemp(prefix='tmp-pyomicron-')
cache = data.find_frames(ifo, frametype, pstart, pend, tmpdir=tmpdir)
else:
cache = data.find_frames(ifo, frametype, pstart, pend)
cache = data.find_frames(ifo, frametype, pstart, pend)

# optimise I/O based on type and library
io_kw = {}
Expand Down