Skip to content

Commit

Permalink
Merge pull request #5352 from hypothesis/realtime-publish-retry
Browse files Browse the repository at this point in the history
Enable retrying AMQP message publication in `h.realtime._publish`
  • Loading branch information
robertknight committed Oct 9, 2018
2 parents 7246462 + d477721 commit 593a372
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 5 deletions.
7 changes: 6 additions & 1 deletion h/realtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,13 +118,18 @@ def publish_user(self, payload):

def _publish(self, routing_key, payload):
headers = {'timestamp': datetime.utcnow().isoformat() + 'Z'}
retry_policy = {'max_retries': 5,
'interval_start': 0.2,
'interval_step': 0.3}

with producer_pool[self.connection].acquire(block=True) as producer:
producer.publish(payload,
exchange=self.exchange,
declare=[self.exchange],
routing_key=routing_key,
headers=headers)
headers=headers,
retry=True,
retry_policy=retry_policy)


def get_exchange():
Expand Down
20 changes: 16 additions & 4 deletions tests/h/realtime_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@ def generate_queue_name(self, patch):


class TestPublisher(object):
def test_publish_annotation(self, matchers, producer_pool, pyramid_request):
def test_publish_annotation(self, matchers, producer_pool, pyramid_request,
retry_policy):
payload = {'action': 'create', 'annotation': {'id': 'foobar'}}
producer = producer_pool['foobar'].acquire().__enter__()
exchange = realtime.get_exchange()
Expand All @@ -112,9 +113,12 @@ def test_publish_annotation(self, matchers, producer_pool, pyramid_request):
exchange=exchange,
declare=[exchange],
routing_key='annotation',
headers=expected_headers)
headers=expected_headers,
retry=True,
retry_policy=retry_policy)

def test_publish_user(self, matchers, producer_pool, pyramid_request):
def test_publish_user(self, matchers, producer_pool, pyramid_request,
retry_policy):
payload = {'action': 'create', 'user': {'id': 'foobar'}}
producer = producer_pool['foobar'].acquire().__enter__()
exchange = realtime.get_exchange()
Expand All @@ -127,7 +131,15 @@ def test_publish_user(self, matchers, producer_pool, pyramid_request):
exchange=exchange,
declare=[exchange],
routing_key='user',
headers=expected_headers)
headers=expected_headers,
retry=True,
retry_policy=retry_policy)

@pytest.fixture
def retry_policy(self):
return {'max_retries': 5,
'interval_start': 0.2,
'interval_step': 0.3}

@pytest.fixture
def producer_pool(self, patch):
Expand Down

0 comments on commit 593a372

Please sign in to comment.