Skip to content

Commit

Permalink
Use ErrThreadMixin to bubble up exceptions from ReaderThread
Browse files Browse the repository at this point in the history
  • Loading branch information
sk1p authored and uellue committed Jul 27, 2021
1 parent 83c1d57 commit 877d451
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 9 deletions.
35 changes: 35 additions & 0 deletions src/libertem_live/detectors/common.py
@@ -0,0 +1,35 @@
import threading
import logging
from typing import Optional

logger = logging.getLogger(__name__)


class StoppableThreadMixin:
def __init__(self, *args, **kwargs):
self._stop_event = threading.Event()
super().__init__(*args, **kwargs)

def stop(self):
self._stop_event.set()

def is_stopped(self):
return self._stop_event.is_set()


class ErrThreadMixin(StoppableThreadMixin):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._error: Optional[Exception] = None

def get_error(self):
return self._error

def error(self, exc):
logger.error("got exception %r, shutting down thread", exc)
self._error = exc
self.stop()

def maybe_raise(self):
if self._error is not None:
raise self._error
21 changes: 12 additions & 9 deletions src/libertem_live/detectors/merlin/data.py
Expand Up @@ -11,6 +11,7 @@
from libertem.io.dataset.base.decode import byteswap_2_decode

from libertem_live.detectors.base.acquisition import AcquisitionTimeout
from libertem_live.detectors.common import ErrThreadMixin

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -481,8 +482,7 @@ def release(self):
self._consumed.set()


# FIXME: use ErrThreadMixin and maybe_raise in the pool
class ReaderThread(threading.Thread):
class ReaderThread(ErrThreadMixin, threading.Thread):
def __init__(self, backend, out_queue, chunk_size, sig_shape, default_timeout=0.2,
read_dtype=np.float32, read_upto_frame=None, *args, **kwargs):
super().__init__(name='ReaderThread', *args, **kwargs)
Expand All @@ -496,12 +496,6 @@ def __init__(self, backend, out_queue, chunk_size, sig_shape, default_timeout=0.
self._read_upto_frame = read_upto_frame
self._sig_shape = sig_shape

def stop(self):
self._stop_event.set()

def is_stopped(self):
return self._stop_event.is_set()

def __enter__(self):
self.start()
return self
Expand Down Expand Up @@ -553,6 +547,9 @@ def run(self):
if self.is_stopped(): # break out of outer loop
should_exit = True
break
except Exception as e:
self.error(e)
raise
finally:
self.stop() # make sure the stopped flag is set in any case

Expand Down Expand Up @@ -583,13 +580,18 @@ def __enter__(self):

def __exit__(self, *args, **kwargs):
logger.debug("ReaderPoolImpl.__exit__: stopping threads")
for t in self._threads: # TODO: handle errors on stopping/joining? re-throw exceptions?
for t in self._threads:
t.stop()
logger.debug("ReaderPoolImpl: stop signal set")
t.join()
t.maybe_raise()
logger.debug("ReaderPoolImpl: thread joined")
logger.debug("ReaderPoolImpl.__exit__: threads stopped")

def _maybe_raise(self):
for t in self._threads:
t.maybe_raise()

@contextlib.contextmanager
def get_result(self):
while True:
Expand All @@ -602,6 +604,7 @@ def get_result(self):
if self.should_stop():
yield None
return
self._maybe_raise()

def should_stop(self):
return any(
Expand Down

0 comments on commit 877d451

Please sign in to comment.