diff --git a/daliuge-engine/dlg/drop.py b/daliuge-engine/dlg/drop.py index 5c78b6549..7456aaf87 100644 --- a/daliuge-engine/dlg/drop.py +++ b/daliuge-engine/dlg/drop.py @@ -1199,7 +1199,7 @@ def path(self) -> str: # @param dataclass Data Class/my.awesome.data.Component/String/ComponentParameter/readonly//False/False/The python class that implements this data component # @param data_volume Data volume/5/Float/ComponentParameter/readwrite//False/False/Estimated size of the data contained in this node # @param group_end Group end/False/Boolean/ComponentParameter/readwrite//False/False/Is this node the end of a group? -# @param streaming Streaming/False/Boolean/ComponentParameter/readwrite//False/False/Specifies whether this data component streams input and output data +# @param streaming Streaming/False/Boolean/ComponentParameter/readwrite//False/False/Specifies whether this data component streams input and output data # @param persist Persist/False/Boolean/ComponentParameter/readwrite//False/False/Specifies whether this data component contains data that should not be deleted after execution # @param dummy dummy//Object/InputPort/readwrite//False/False/Dummy input port # @param dummy dummy//Object/OutputPort/readwrite//False/False/Dummy output port @@ -1931,11 +1931,18 @@ def async_execute(self): # Return immediately, but schedule the execution of this app # If we have been given a thread pool use that if hasattr(self, "_tp"): - self._tp.apply_async(self.execute) + self._tp.apply_async(self._execute_and_log_exception) else: - t = threading.Thread(target=self.execute) + t = threading.Thread(target=self._execute_and_log_exception) t.daemon = 1 t.start() + return t + + def _execute_and_log_exception(self): + try: + self.execute() + except: + logger.exception("Unexpected exception during drop (%r) execution", self) _dlg_proc_lock = threading.Lock() diff --git a/daliuge-engine/test/test_input_fired_app_drop.py b/daliuge-engine/test/test_input_fired_app_drop.py new file mode 100644 index 000000000..475d0152e --- /dev/null +++ b/daliuge-engine/test/test_input_fired_app_drop.py @@ -0,0 +1,42 @@ +import threading +from dlg.drop import InputFiredAppDROP +from dlg.event import Event, EventHandler +import pytest + + +class MockThrowingDrop(InputFiredAppDROP): + def run(): + raise RuntimeError("Drop throw") + + +class MockThrowingHandler(EventHandler): + def handleEvent(self, e: Event) -> None: + raise RuntimeError("Handler throw") + + +def test_async_execute_catches_and_logs_unexpected_exception( + caplog: pytest.LogCaptureFixture, +): + drop = MockThrowingDrop("t", "t", n_effective_inputs=1) + handler = MockThrowingHandler() + drop.subscribe(handler) + + thread = drop.async_execute() + assert isinstance(thread, threading.Thread) + thread.join() + + assert "Handler throw" in caplog.text + # execute should handle exceptions in the run method + assert "Drop throw" not in caplog.text + + +def test_execute_propogates_unexpected_exception(): + drop = MockThrowingDrop("t", "t", n_effective_inputs=1) + handler = MockThrowingHandler() + drop.subscribe(handler) + + with pytest.raises(RuntimeError) as e: + drop.execute() + + assert "Handler throw" in str(e.value) + assert "Drop throw" not in str(e.value)