Skip to content

Commit

Permalink
Merge pull request #5337 from hypothesis/isolate-event-subscriber-fai…
Browse files Browse the repository at this point in the history
…lures

Isolate event subscriber failures when calling `request.notify_after_commit`
  • Loading branch information
robertknight committed Oct 10, 2018
2 parents fb196cc + 0295999 commit 914e671
Show file tree
Hide file tree
Showing 2 changed files with 125 additions and 59 deletions.
58 changes: 41 additions & 17 deletions h/eventqueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,32 @@
import collections
import logging

from zope.interface import providedBy


log = logging.getLogger(__name__)


def _get_subscribers(registry, event):
# This code is adapted from the `subscribers` method in
# `zope.interface.adapter` which is what Pyramid's `request.registry.notify`
# is a very thin wrapper around.
return registry.adapters.subscriptions(map(providedBy, [event]), None)


class EventQueue(object):
"""
EventQueue enables dispatching Pyramid events at the end of a request.
An instance of this class is exposed on the request object via the
`notify_after_commit` method. The `_after_commit` part refers to the
database transaction associated with the request. Unlike calling
`request.registry.notify` during a request, failures will not cause a
database transaction rollback.
Events are dispatched in the order they are queued. Failure of one
event subscriber does not affect execution of other subscribers.
"""
def __init__(self, request):
self.request = request
self.queue = collections.deque()
Expand All @@ -25,23 +46,26 @@ def publish_all(self):
except IndexError:
break

try:
# Notify all subscribers to this particular event. Note that the
# order in which subcribers run is not guaranteed [1] and if one
# fails, remaining subscribers to the same event which have not
# yet run will be skipped.
#
# [1] See https://docs.pylonsproject.org/projects/pyramid/en/latest/narr/events.html
self.request.registry.notify(event)
except Exception:
sentry = getattr(event.request, 'sentry', None)
if sentry is not None:
sentry.captureException()
else:
log.exception('Queued event subscriber failed')

if event.request.debug:
raise
# Get subscribers to event and invoke them. The normal way to do
# this in Pyramid is to invoke `registry.notify`, but that provides
# no guarantee about the order of execution and any failure causes
# later subscribers not to run.
#
# Here we wrap each subscriber call in an exception handler to
# make failure independent in non-debug environments.
subscribers = _get_subscribers(self.request.registry, event)
for subscriber in subscribers:
try:
subscriber(event)
except Exception:
sentry = getattr(event.request, 'sentry', None)
if sentry is not None:
sentry.captureException()
else:
log.exception('Queued event subscriber failed')

if event.request.debug:
raise

def response_callback(self, request, response):
if request.exception is not None:
Expand Down
126 changes: 84 additions & 42 deletions tests/h/eventqueue_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,88 +7,130 @@
from h import eventqueue


class TestEventQueue(object):
def test_init_adds_response_callback(self, pyramid_request):
request = mock.Mock()
queue = eventqueue.EventQueue(request)
class DummyEvent(object):
def __init__(self, request):
# EventQueue currently assumes that events have a `request` attribute.
self.request = request

request.add_response_callback.assert_called_once_with(queue.response_callback)

def test_call_appends_event_to_queue(self):
queue = eventqueue.EventQueue(mock.Mock())
class TestEventQueue(object):
def test_init_adds_response_callback(self, queue, pyramid_request):
pyramid_request.add_response_callback = mock.Mock(autospec=pyramid_request.add_response_callback)
queue = eventqueue.EventQueue(pyramid_request)
pyramid_request.add_response_callback.assert_called_once_with(queue.response_callback)

def test_call_appends_event_to_queue(self, queue, event):
assert len(queue.queue) == 0
event = mock.Mock()

queue(event)
assert list(queue.queue) == [event]

def test_publish_all_notifies_events_in_fifo_order(self, notify, pyramid_request):
queue = eventqueue.EventQueue(pyramid_request)
firstevent = mock.Mock(request=pyramid_request)
def test_publish_all_notifies_events_in_fifo_order(self, queue, subscriber, event_factory, pyramid_config):
pyramid_config.add_subscriber(subscriber, DummyEvent)
firstevent = event_factory.get()
secondevent = event_factory.get()

queue(firstevent)
secondevent = mock.Mock(request=pyramid_request)
queue(secondevent)

queue.publish_all()

assert notify.call_args_list == [
mock.call(firstevent),
mock.call(secondevent)
]
subscriber.assert_has_calls([mock.call(firstevent), mock.call(secondevent)])

def test_publish_all_sandboxes_each_event(self, notify, pyramid_request):
queue = eventqueue.EventQueue(pyramid_request)
firstevent = mock.Mock(request=pyramid_request)
queue(firstevent)
secondevent = mock.Mock(request=pyramid_request)
queue(secondevent)
def test_publish_all_sandboxes_each_subscriber(self, event, subscriber_factory, failing_subscriber, pyramid_config, queue):
subscribers = [
subscriber_factory.get(),
failing_subscriber,
subscriber_factory.get()]

for sub in subscribers:
pyramid_config.add_subscriber(sub, DummyEvent)

queue(event)
queue.publish_all()

assert notify.call_args_list == [
mock.call(firstevent),
mock.call(secondevent)
]
# If one subscriber raises an exception, that shouldn't prevent others
# from running.
for sub in subscribers:
sub.assert_called_once_with(event)

def test_publish_all_sends_exception_to_sentry(self, notify, pyramid_request):
pyramid_request.sentry = mock.Mock()
notify.side_effect = ValueError('exploded!')
queue = eventqueue.EventQueue(pyramid_request)
event = mock.Mock(request=pyramid_request)
def test_publish_all_reraises_in_debug_mode(self, failing_subscriber, event, queue, pyramid_request, pyramid_config):
pyramid_config.add_subscriber(failing_subscriber, DummyEvent)
pyramid_request.debug = True

with pytest.raises(Exception) as excinfo:
queue(event)
queue.publish_all()
assert str(excinfo.value) == 'boom!'

def test_publish_all_sends_exception_to_sentry(self, failing_subscriber, event, queue, pyramid_request, pyramid_config):
pyramid_config.add_subscriber(failing_subscriber, DummyEvent)
pyramid_request.sentry = mock.Mock(spec_set=['captureException'])
queue(event)

queue.publish_all()

assert pyramid_request.sentry.captureException.called

def test_publish_all_logs_exception_when_sentry_is_not_available(self, log, notify, pyramid_request):
notify.side_effect = ValueError('exploded!')
queue = eventqueue.EventQueue(pyramid_request)
event = mock.Mock(request=pyramid_request)
def test_publish_all_logs_exception_when_sentry_is_not_available(self, log, failing_subscriber, event, queue, pyramid_config):
pyramid_config.add_subscriber(failing_subscriber, DummyEvent)
queue(event)

queue.publish_all()

assert log.exception.called

def test_response_callback_skips_publishing_events_on_exception(self, publish_all, pyramid_request):
def test_response_callback_skips_publishing_events_on_exception(self, publish_all, queue, pyramid_request):
pyramid_request.exception = ValueError('exploded!')
queue = eventqueue.EventQueue(pyramid_request)
queue.response_callback(pyramid_request, None)
assert not publish_all.called

def test_response_callback_publishes_events(self, publish_all, pyramid_request):
queue = eventqueue.EventQueue(pyramid_request)
queue(mock.Mock())
def test_response_callback_publishes_events(self, publish_all, event, queue, pyramid_request):
queue(event)
queue.response_callback(pyramid_request, None)
assert publish_all.called

@pytest.fixture
def log(self, patch):
return patch('h.eventqueue.log')
return patch('h.eventqueue.log', autospec=True)

@pytest.fixture
def publish_all(self, patch):
return patch('h.eventqueue.EventQueue.publish_all')
return patch('h.eventqueue.EventQueue.publish_all', autospec=True)

@pytest.fixture
def event(self, event_factory):
return event_factory.get()

@pytest.fixture
def event_factory(self, pyramid_request):

class EventFactory():
def get(self):
return DummyEvent(pyramid_request)

return EventFactory()

@pytest.fixture
def subscriber(self, subscriber_factory):
return subscriber_factory.get()

@pytest.fixture
def failing_subscriber(self, subscriber_factory):
sub = subscriber_factory.get()
sub.side_effect = Exception("boom!")
return sub

@pytest.fixture
def subscriber_factory(self):
class SubscriberFactory():
def get(self):
return mock.Mock()
return SubscriberFactory()

@pytest.fixture
def queue(self, pyramid_request):
return eventqueue.EventQueue(pyramid_request)

@pytest.fixture
def pyramid_request(self, pyramid_request):
Expand Down

0 comments on commit 914e671

Please sign in to comment.