Skip to content

Commit

Permalink
Merge branch 'master' into key_delim
Browse files Browse the repository at this point in the history
  • Loading branch information
tunnell committed Sep 1, 2018
2 parents da71de7 + 72e82e6 commit d5df346
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 9 deletions.
8 changes: 2 additions & 6 deletions strax/mailbox.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from concurrent.futures import Future, TimeoutError
import heapq
import sys
import threading
import typing
import logging
Expand Down Expand Up @@ -164,12 +165,7 @@ def _send_from(self, iterable):
self.kill(reason=e.args[0])
# Do NOT raise! One traceback on the screen is enough.
except Exception as e:
try:
e_args = e.args[0]
except IndexError:
e_args = ''
self.kill(reason=f"{e.__class__.__name__}('{e_args}') "
f'in {threading.current_thread().name}')
self.kill(reason=(e.__class__, e, sys.exc_info()[2]))
raise
else:
self.close()
Expand Down
11 changes: 10 additions & 1 deletion strax/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,15 +93,24 @@ def iter(self):
self.log.debug(f"Yielding {target}")
try:
yield from final_generator
traceback = None
exc = None
except strax.MailboxKilled as e:
self.log.debug(f"Target Mailbox ({target}) killed")
for m in self.mailboxes.values():
if m != target:
self.log.debug(f"Killing {m}")
m.kill(upstream=True,
reason=e.args[0])
raise
_, exc, traceback = e.args[0]
finally:
self.log.debug("Closing threads")
for m in self.mailboxes.values():
m.cleanup()

# Reraise exception. This is outside the except block
# to avoid the 'during handling of this exception, another
# exception occurred' stuff from confusing the traceback
# which is printed for the user
if traceback is not None:
raise exc.with_traceback(traceback)
14 changes: 12 additions & 2 deletions strax/storage/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ def __init__(self, at, message=''):
self.at = at


@export
class CorruptedData(Exception):
pass


@export
class RunMetadataNotAvailable(Exception):
pass
Expand Down Expand Up @@ -297,7 +302,7 @@ def loader(self, backend_key, n_range=None, executor=None):
"""
metadata = self.get_metadata(backend_key)
if not len(metadata['chunks']):
self.log.warning(f"No actual data in {backend_key}?")
raise CorruptedData(f"No chunks of data in {backend_key}")
dtype = literal_eval(metadata['dtype'])
compressor = metadata['compressor']

Expand Down Expand Up @@ -409,7 +414,12 @@ def close(self, wait_for=None, timeout=120):
self.closed = True

exc_info = sys.exc_info()
if exc_info[0] not in [None, StopIteration]:
if exc_info[0] == strax.MailboxKilled:
# Get the original exception back out, and put that
# in the metadata
self.md['exception'] = '\n'.join(
traceback.format_exception(*exc_info[1].args[0]))
elif exc_info[0] not in [None, StopIteration]:
self.md['exception'] = traceback.format_exc()
self.md['writing_ended'] = time.time()

Expand Down
29 changes: 29 additions & 0 deletions tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,17 @@
import strax


@strax.takes_config(
strax.Option('crash', default=False)
)
class Records(strax.Plugin):
provides = 'records'
depends_on = tuple()
dtype = strax.record_dtype()

def iter(self, *args, **kwargs):
if self.config['crash']:
raise SomeCrash("CRASH!!!!")
for t in range(n_chunks):
r = np.zeros(recs_per_chunk, self.dtype)
r['time'] = t
Expand All @@ -26,6 +31,10 @@ def iter(self, *args, **kwargs):
yield r


class SomeCrash(Exception):
pass


@strax.takes_config(
strax.Option('some_option', default=0)
)
Expand Down Expand Up @@ -147,6 +156,22 @@ def test_storage_converter():
store_2.find(key)


def test_exception():
with tempfile.TemporaryDirectory() as temp_dir:
st = strax.Context(storage=strax.DataDirectory(temp_dir),
register=[Records, Peaks],
config=dict(crash=True))

# Check correct exception is thrown
with pytest.raises(SomeCrash):
st.make(run_id=run_id, targets='peaks')

# Check exception is recorded in metadata
# in both its original data type and dependents
for target in ('peaks', 'records'):
assert 'SomeCrash' in st.get_meta(run_id, target)['exception']


def test_random_access():
"""Test basic random access
TODO: test random access when time info is not provided directly
Expand All @@ -157,6 +182,7 @@ def test_random_access():

st = strax.Context(storage=strax.DataDirectory(temp_dir),
register=[Records, Peaks])

with pytest.raises(strax.DataNotAvailable):
# Time range selection requires data already available
st.get_df(run_id, 'peaks', time_range=(3, 5))
Expand All @@ -168,6 +194,9 @@ def test_random_access():
str(st._key_for(run_id, 'peaks')),
'000000'))

with pytest.raises(FileNotFoundError):
st.get_array(run_id, 'peaks')

df = st.get_array(run_id, 'peaks', time_range=(3, 5))
assert len(df) == 2 * recs_per_chunk
assert df['time'].min() == 3
Expand Down

0 comments on commit d5df346

Please sign in to comment.