Skip to content

Commit

Permalink
data: cleaned up ll file finding
Browse files Browse the repository at this point in the history
use datafind to find whatever we can, then use /dev/shm for everything else
  • Loading branch information
duncanmmacleod committed Jun 18, 2018
1 parent b92c0ab commit fbe2592
Showing 1 changed file with 64 additions and 40 deletions.
104 changes: 64 additions & 40 deletions omicron/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,8 @@ def check_data_availability(obs, frametype, start, end, connection=None):
connection.find_frame_urls(obs[0], frametype, start, end, on_gaps='error')


@with_datafind_connection
def find_frames(obs, frametype, start, end, connection=None, **kwargs):
def find_frames(obs, frametype, start, end, connection=None, on_gaps='warn',
**kwargs):
"""Find all frames for the given frametype in the GPS interval
Parameters
Expand All @@ -201,46 +201,78 @@ def find_frames(obs, frametype, start, end, connection=None, **kwargs):
cache : `~glue.lal.Cache`
a cache of frame file locations
"""
ll_kw = {key: kwargs.pop(key) for key in ('tmpdir', 'root',) if
key in kwargs}

# find files using datafind
cache = _find_frames_datafind(obs, frametype, start, end,
connection=connection, on_gaps='ignore',
**kwargs)

# find more files for low-latency under /dev/shm (or similar)
if re_ll.search(frametype):
return find_ll_frames(obs, frametype, start, end, **kwargs)
else:
kwargs.setdefault('urltype', 'file')
cache = connection.find_frame_urls(obs[0], frametype, start, end,
**kwargs)
# use latest frame to find more recent frames that aren't in
# datafind yet, this is quite hacky, and isn't guaranteed to
# work at any point, but it shouldn't break anything
try:
latest = cache[-1]
except IndexError: # no frames
return cache
try:
ngps = len(re_gwf_gps_epoch.search(
os.path.dirname(latest.path)).groupdict()['gpsepoch'])
except AttributeError: # no match
pass
latest = cache[-1].segment[1]
except IndexError:
latest = start
if latest < end:
cache.extend(find_ll_frames(obs, frametype, latest, end, **ll_kw))

# handle missing files
if on_gaps != 'ignore':
seglist = SegmentList(e.segment for e in cache).coalesce()
missing = (SegmentList([Segment(start, end)]) - seglist).coalesce()
msg = "Missing segments:\n%s" % '\n'.join(map(str, missing))
if missing and on_gaps == 'warn':
warnings.warn(msg)
elif missing:
raise RuntimeError(msg)

return cache


@with_datafind_connection
def _find_frames_datafind(obs, frametype, start, end, connection=None,
**kwargs):
kwargs.setdefault('urltype', 'file')
cache = connection.find_frame_urls(obs[0], frametype, start, end,
**kwargs)

# use latest frame to find more recent frames that aren't in
# datafind yet, this is quite hacky, and isn't guaranteed to
# work at any point, but it shouldn't break anything

try:
latest = cache[-1]
except IndexError: # no frames
return cache

try: # find number of GPS digits in paths
ngps = len(re_gwf_gps_epoch.search(
os.path.dirname(latest.path)).groupdict()['gpsepoch'])
except AttributeError: # no match
return cache

while True:
s, e = latest.segment
if s >= end: # dont' go beyond requested times
break
new = latest.path.replace('-%d-' % s, '-%d-' % e)
new = new.replace('%s/' % str(s)[:ngps], '%s/' % str(e)[:ngps])
if os.path.isfile(new):
latest = CacheEntry.from_T050017(new)
cache.append(latest)
else:
while True:
s, e = latest.segment
if s >= end: # dont' go beyond requested times
break
new = latest.path.replace('-%d-' % s, '-%d-' % e)
new = new.replace('%s/' % str(s)[:ngps], '%s/' % str(e)[:ngps])
if os.path.isfile(new):
latest = CacheEntry.from_T050017(new)
cache.append(latest)
else:
break
return cache
break
return cache


def write_cache(cache, outfile):
with open(outfile, 'w') as fp:
cache.tofile(fp)


def find_ll_frames(ifo, frametype, start, end, root='/dev/shm',
on_gaps='warn', tmpdir=None):
def find_ll_frames(ifo, frametype, start, end, root='/dev/shm', tmpdir=None):
"""Find all buffered low-latency frames in the given interval
Parameters
Expand Down Expand Up @@ -279,14 +311,6 @@ def find_ll_frames(ifo, frametype, start, end, root='/dev/shm',
"""
seg = Segment(start, end)
cache = _find_ll_frames(ifo, frametype, root=root).sieve(segment=seg)
if on_gaps != 'ignore':
seglist = SegmentList(e.segment for e in cache).coalesce()
missing = (SegmentList([seg]) - seglist).coalesce()
msg = "Missing segments:\n%s" % '\n'.join(map(str, missing))
if missing and on_gaps == 'warn':
warnings.warn(msg)
elif missing:
raise RuntimeError(msg)
if tmpdir:
out = []
if not os.path.isdir(tmpdir):
Expand Down

0 comments on commit fbe2592

Please sign in to comment.