Skip to content

Commit

Permalink
Add tests for Celery/ServiceBus integration (#134)
Browse files Browse the repository at this point in the history
* Add test for Celery/ServiceBus integration

* Update to latest kombu version
  • Loading branch information
c-w committed Feb 4, 2019
1 parent 216e321 commit 999fa65
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 2 deletions.
2 changes: 1 addition & 1 deletion requirements-prod.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
azure-servicebus==0.21.1 # pyup: ignore
azure-servicebus==0.50.0
gunicorn==19.9.0
tornado==5.1.1
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,6 @@ requests==2.21.0
sendgrid==5.6.0
typing-extensions==3.7.2
typing==3.6.6
https://github.com/celery/kombu/archive/b3dc9208837566193deda824bc67dc900c7ed9a6.zip
https://github.com/celery/kombu/archive/483cadced77d82a6ecd0be553b91ce92f04f9617.zip
celery==4.2.1
xtarfile[zstd]==0.0.2
Empty file.
39 changes: 39 additions & 0 deletions tests/opwen_email_server/integration/test_celery.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
from os import getenv
from random import choice
from string import ascii_letters
from unittest import TestCase
from unittest import skipUnless

from cached_property import cached_property
from kombu import Connection
from kombu import Exchange
from kombu import Queue

from opwen_email_server.config import QUEUE_BROKER


class TransportTests(TestCase):
exchange_name = getenv('KOMBU_EXCHANGE', 'testexchange')
queue_name = getenv('KOMBU_QUEUE', 'testqueue')
routing_key = getenv('KOMBU_ROUTING_KEY', 'testkey')

@cached_property
def exchange(self) -> Exchange:
return Exchange(self.exchange_name, 'direct', durable=True)

@cached_property
def queue(self) -> Queue:
return Queue(self.queue_name, exchange=self.exchange,
routing_key=self.routing_key)

@skipUnless(QUEUE_BROKER, 'no celery broker configured')
def test_send_message(self):
random_message = ''.join(choice(ascii_letters) # nosec
for _ in range(30))

with Connection(QUEUE_BROKER) as connection:
producer = connection.Producer()
producer.publish({'message': random_message, 'test': True},
exchange=self.exchange,
routing_key=self.routing_key,
declare=[self.queue])

0 comments on commit 999fa65

Please sign in to comment.