Skip to content

Commit

Permalink
LIU-322: Log exceptions during async InputFiredAppDrop execution
Browse files Browse the repository at this point in the history
Add test to ensure it is only exceptions outside of normal execution
that get caught and logged here.
  • Loading branch information
juliancarrivick committed Nov 8, 2022
1 parent 0481797 commit b71eae0
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 3 deletions.
13 changes: 10 additions & 3 deletions daliuge-engine/dlg/drop.py
Original file line number Diff line number Diff line change
Expand Up @@ -1199,7 +1199,7 @@ def path(self) -> str:
# @param dataclass Data Class/my.awesome.data.Component/String/ComponentParameter/readonly//False/False/The python class that implements this data component
# @param data_volume Data volume/5/Float/ComponentParameter/readwrite//False/False/Estimated size of the data contained in this node
# @param group_end Group end/False/Boolean/ComponentParameter/readwrite//False/False/Is this node the end of a group?
# @param streaming Streaming/False/Boolean/ComponentParameter/readwrite//False/False/Specifies whether this data component streams input and output data
# @param streaming Streaming/False/Boolean/ComponentParameter/readwrite//False/False/Specifies whether this data component streams input and output data
# @param persist Persist/False/Boolean/ComponentParameter/readwrite//False/False/Specifies whether this data component contains data that should not be deleted after execution
# @param dummy dummy//Object/InputPort/readwrite//False/False/Dummy input port
# @param dummy dummy//Object/OutputPort/readwrite//False/False/Dummy output port
Expand Down Expand Up @@ -1931,11 +1931,18 @@ def async_execute(self):
# Return immediately, but schedule the execution of this app
# If we have been given a thread pool use that
if hasattr(self, "_tp"):
self._tp.apply_async(self.execute)
self._tp.apply_async(self._execute_and_log_exception)
else:
t = threading.Thread(target=self.execute)
t = threading.Thread(target=self._execute_and_log_exception)
t.daemon = 1
t.start()
return t

def _execute_and_log_exception(self):
try:
self.execute()
except:
logger.exception("Unexpected exception during drop (%r) execution", self)

_dlg_proc_lock = threading.Lock()

Expand Down
42 changes: 42 additions & 0 deletions daliuge-engine/test/test_input_fired_app_drop.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import threading
from dlg.drop import InputFiredAppDROP
from dlg.event import Event, EventHandler
import pytest


class MockThrowingDrop(InputFiredAppDROP):
def run():
raise RuntimeError("Drop throw")


class MockThrowingHandler(EventHandler):
def handleEvent(self, e: Event) -> None:
raise RuntimeError("Handler throw")


def test_async_execute_catches_and_logs_unexpected_exception(
caplog: pytest.LogCaptureFixture,
):
drop = MockThrowingDrop("t", "t", n_effective_inputs=1)
handler = MockThrowingHandler()
drop.subscribe(handler)

thread = drop.async_execute()
assert isinstance(thread, threading.Thread)
thread.join()

assert "Handler throw" in caplog.text
# execute should handle exceptions in the run method
assert "Drop throw" not in caplog.text


def test_execute_propogates_unexpected_exception():
drop = MockThrowingDrop("t", "t", n_effective_inputs=1)
handler = MockThrowingHandler()
drop.subscribe(handler)

with pytest.raises(RuntimeError) as e:
drop.execute()

assert "Handler throw" in str(e.value)
assert "Drop throw" not in str(e.value)

0 comments on commit b71eae0

Please sign in to comment.