Skip to content

Commit

Permalink
Merge b71eae0 into e383be7
Browse files Browse the repository at this point in the history
  • Loading branch information
juliancarrivick committed Nov 8, 2022
2 parents e383be7 + b71eae0 commit 2a58d93
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 19 deletions.
3 changes: 0 additions & 3 deletions daliuge-engine/dlg/data/drops/parset_drop.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
#
import io
import os
from abc import abstractmethod

from dlg.drop import DataDROP, DEFAULT_INTERNAL_PARAMETERS
from dlg.data.io import MemoryIO
Expand Down Expand Up @@ -53,7 +52,6 @@ class ParameterSetDROP(DataDROP):

mode = dlg_string_param("mode", None)

@abstractmethod
def serialize_parameters(self, parameters: dict, mode):
"""
Returns a string representing a serialization of the parameters.
Expand All @@ -64,7 +62,6 @@ def serialize_parameters(self, parameters: dict, mode):
# Add more formats (.ini for example)
return "\n".join(f"{x}={y}" for x, y in parameters.items())

@abstractmethod
def filter_parameters(self, parameters: dict, mode):
"""
Returns a dictionary of parameters, with daliuge-internal or other parameters filtered out
Expand Down
19 changes: 13 additions & 6 deletions daliuge-engine/dlg/drop.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
DROPStates,
DROPRel,
)
from dlg.event import EventFirer
from dlg.event import EventFirer, EventHandler
from dlg.exceptions import InvalidDropException, InvalidRelationshipException
from dlg.data.io import (
DataIO,
Expand Down Expand Up @@ -129,7 +129,7 @@ def append(self, drop):
# ===============================================================================


class AbstractDROP(EventFirer):
class AbstractDROP(EventFirer, EventHandler):
"""
Base class for all DROP implementations.
Expand Down Expand Up @@ -734,7 +734,7 @@ def handleInterest(self, drop):
contained in the dropspec dictionaries held in the session.
"""

def _fire(self, eventType, **kwargs):
def _fire(self, eventType: str, **kwargs):
"""
Delivers an event of `eventType` to all interested listeners.
Expand Down Expand Up @@ -1199,7 +1199,7 @@ def path(self) -> str:
# @param dataclass Data Class/my.awesome.data.Component/String/ComponentParameter/readonly//False/False/The python class that implements this data component
# @param data_volume Data volume/5/Float/ComponentParameter/readwrite//False/False/Estimated size of the data contained in this node
# @param group_end Group end/False/Boolean/ComponentParameter/readwrite//False/False/Is this node the end of a group?
# @param streaming Streaming/False/Boolean/ComponentParameter/readwrite//False/False/Specifies whether this data component streams input and output data
# @param streaming Streaming/False/Boolean/ComponentParameter/readwrite//False/False/Specifies whether this data component streams input and output data
# @param persist Persist/False/Boolean/ComponentParameter/readwrite//False/False/Specifies whether this data component contains data that should not be deleted after execution
# @param dummy dummy//Object/InputPort/readwrite//False/False/Dummy input port
# @param dummy dummy//Object/OutputPort/readwrite//False/False/Dummy output port
Expand Down Expand Up @@ -1931,11 +1931,18 @@ def async_execute(self):
# Return immediately, but schedule the execution of this app
# If we have been given a thread pool use that
if hasattr(self, "_tp"):
self._tp.apply_async(self.execute)
self._tp.apply_async(self._execute_and_log_exception)
else:
t = threading.Thread(target=self.execute)
t = threading.Thread(target=self._execute_and_log_exception)
t.daemon = 1
t.start()
return t

def _execute_and_log_exception(self):
try:
self.execute()
except:
logger.exception("Unexpected exception during drop (%r) execution", self)

_dlg_proc_lock = threading.Lock()

Expand Down
31 changes: 21 additions & 10 deletions daliuge-engine/dlg/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
# MA 02111-1307 USA
#

import collections
from collections import defaultdict
import logging
from abc import ABC, abstractmethod
from typing import Optional, Union

logger = logging.getLogger(__name__)

Expand All @@ -36,13 +38,19 @@ class Event(object):
attached to individual instances of this class, depending on the event type.
"""

def __init__(self):
self.type = None
def __init__(self, type: str):
self.type = type

def __repr__(self, *args, **kwargs):
return "<Event %r>" % (self.__dict__)


class EventHandler(ABC):
@abstractmethod
def handleEvent(self, e: Event) -> None:
pass


class EventFirer(object):
"""
An object that fires events.
Expand All @@ -60,9 +68,12 @@ class EventFirer(object):
__ALL_EVENTS = object()

def __init__(self):
self._listeners = collections.defaultdict(list)
# Union string key with object to handle __ALL_EVENTS above
self._listeners: defaultdict[
Union[str, object], list[EventHandler]
] = defaultdict(list)

def subscribe(self, listener, eventType=None):
def subscribe(self, listener: EventHandler, eventType: Optional[str] = None):
"""
Subscribes `listener` to events fired by this object. If `eventType` is
not `None` then `listener` will only receive events of `eventType` that
Expand All @@ -74,7 +85,7 @@ def subscribe(self, listener, eventType=None):
eventType = eventType or EventFirer.__ALL_EVENTS
self._listeners[eventType].append(listener)

def unsubscribe(self, listener, eventType=None):
def unsubscribe(self, listener: EventHandler, eventType: Optional[str] = None):
"""
Unsubscribes `listener` from events fired by this object.
"""
Expand All @@ -86,7 +97,7 @@ def unsubscribe(self, listener, eventType=None):
if listener in self._listeners[eventType]:
self._listeners[eventType].remove(listener)

def _fireEvent(self, eventType, **attrs):
def _fireEvent(self, eventType: str, **attrs):
"""
Delivers an event of `eventType` to all interested listeners.
Expand All @@ -95,7 +106,7 @@ def _fireEvent(self, eventType, **attrs):
"""

# Which listeners should we call?
listeners = []
listeners: list[EventHandler] = []
if eventType in self._listeners:
listeners += self._listeners[eventType]
if EventFirer.__ALL_EVENTS in self._listeners:
Expand All @@ -106,8 +117,8 @@ def _fireEvent(self, eventType, **attrs):

# Now that we are sure there are listeners for our event
# create it and send it to all of them
e = Event()
e.type = eventType
e = Event(eventType)

for k, v in attrs.items():
setattr(e, k, v)

Expand Down
43 changes: 43 additions & 0 deletions daliuge-engine/test/test_event.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
from typing import Optional
from dlg.event import EventFirer, EventHandler, Event
import pytest


class MockEventSource(EventFirer):
def fireEvent(self, eventType, **kwargs):
self._fireEvent(eventType, **kwargs)


class MockThrowingEventHandler(EventHandler):
def __init__(self) -> None:
self.wasCalled = False

def handleEvent(self, e: Event) -> None:
self.wasCalled = True
raise RuntimeError("MockThrow", e)


class MockEventHandler(EventHandler):
def __init__(self) -> None:
self.lastEvent: Optional[Event] = None

def handleEvent(self, e: Event) -> None:
self.lastEvent = e


def test_listener_exception_interrupts_later_handlers():
eventSource = MockEventSource()
handler1 = MockEventHandler()
handler2 = MockEventHandler()
throwingHandler = MockThrowingEventHandler()
eventSource.subscribe(handler1, "raise")
eventSource.subscribe(throwingHandler, "raise")
eventSource.subscribe(handler2, "raise")

with pytest.raises(RuntimeError):
eventSource.fireEvent("raise", prop="value")

assert throwingHandler.wasCalled
assert handler1.lastEvent is not None
assert getattr(handler1.lastEvent, "prop") == "value"
assert handler2.lastEvent is None
42 changes: 42 additions & 0 deletions daliuge-engine/test/test_input_fired_app_drop.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import threading
from dlg.drop import InputFiredAppDROP
from dlg.event import Event, EventHandler
import pytest


class MockThrowingDrop(InputFiredAppDROP):
def run():
raise RuntimeError("Drop throw")


class MockThrowingHandler(EventHandler):
def handleEvent(self, e: Event) -> None:
raise RuntimeError("Handler throw")


def test_async_execute_catches_and_logs_unexpected_exception(
caplog: pytest.LogCaptureFixture,
):
drop = MockThrowingDrop("t", "t", n_effective_inputs=1)
handler = MockThrowingHandler()
drop.subscribe(handler)

thread = drop.async_execute()
assert isinstance(thread, threading.Thread)
thread.join()

assert "Handler throw" in caplog.text
# execute should handle exceptions in the run method
assert "Drop throw" not in caplog.text


def test_execute_propogates_unexpected_exception():
drop = MockThrowingDrop("t", "t", n_effective_inputs=1)
handler = MockThrowingHandler()
drop.subscribe(handler)

with pytest.raises(RuntimeError) as e:
drop.execute()

assert "Handler throw" in str(e.value)
assert "Drop throw" not in str(e.value)

0 comments on commit 2a58d93

Please sign in to comment.