Skip to content

Commit

Permalink
Enable auto-delete on AMQP queues
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelvl committed Jun 11, 2018
1 parent 0dd4ffc commit 08d8777
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 15 deletions.
44 changes: 33 additions & 11 deletions messagebus.py
Expand Up @@ -7,6 +7,7 @@
import jsonschema import jsonschema
import avro.schema, avro.io import avro.schema, avro.io
import base64 import base64
import random
import logging import logging


logger = logging.getLogger('messagebus') logger = logging.getLogger('messagebus')
Expand Down Expand Up @@ -82,29 +83,46 @@ def get_consumers(self, Consumer, channel):


def message_cb(self, message): def message_cb(self, message):
msg = message.payload msg = message.payload
jsonschema.validate(msg, ENVELOPE_SCHEMA) try:
schema = self.schema_registry.schema_get(msg['schema'], msg['version']) jsonschema.validate(msg, ENVELOPE_SCHEMA)

try:
bytes_reader = io.BytesIO(base64.b64decode(msg['message'])) schema = self.schema_registry.schema_get(msg['schema'], msg['version'])
decoder = avro.io.BinaryDecoder(bytes_reader) bytes_reader = io.BytesIO(base64.b64decode(msg['message']))
reader = avro.io.DatumReader(schema) decoder = avro.io.BinaryDecoder(bytes_reader)
msg = reader.read(decoder) reader = avro.io.DatumReader(schema)

msg = reader.read(decoder)
self.on_message(msg, message) self.on_message(msg, message)
except:
logging.error('Unable to validate message schema. Message: {}, schema ({}/{}): {}, delivery info: {}'.format(msg, msg['schema'], msg['version'], schema, message.delivery_info))
message.ack()
except:
logging.error('Unable to validate envelope schema. Message: {}, type {}, delivery info: {}'.format(msg, type(msg), message.delivery_info))
message.ack()


def on_message(self, payload, message): def on_message(self, payload, message):
message.ack() message.ack()




class TestAmqp(Amqp): class TestAmqp(Amqp):
def on_message(self, payload, message): def on_message(self, payload, message):
print 'Routing key:', message.delivery_info['routing_key'] print 'Delivery info: {}'.format(message.delivery_info)
print 'Payload:', payload print 'Payload:', payload
message.ack() message.ack()


def test_recv(args): def test_recv(args):
logging.debug('Entering test_recv(), key={}'.format(args.key)) logging.debug('Entering test_recv(), key={}'.format(args.key))
amqp = TestAmqp(args.url, args.exchange, args.exchange_type, [(args.queue, args.key)], [(args.queue, args.key)]) if args.queue:
qname = args.queue
else:
qname = 'q-recv-'+str(random.randint(1,1000))
amqp = TestAmqp(args.url, args.exchange, args.exchange_type, [(qname, args.key, args.noautodelete)], [(args.queue, args.key, args.noautodelete)])
amqp.run()

def test_logs(args):
'''Firehose logging. Enable with rabbitmqctl trace_on'''
logging.debug('Entering test_logs()')
queues = [('firehose', '#', True)]
amqp = TestAmqp(args.url, 'amq.rabbitmq.trace', 'topic', queues, queues)
amqp.run() amqp.run()




Expand All @@ -118,13 +136,17 @@ def test_recv(args):
parser.add_argument('--url', default='amqp://osmtracker-amqp') parser.add_argument('--url', default='amqp://osmtracker-amqp')
parser.add_argument('--exchange', default='osmtracker') parser.add_argument('--exchange', default='osmtracker')
parser.add_argument('--exchange-type', default='topic', choices=['topic', 'fanout']) parser.add_argument('--exchange-type', default='topic', choices=['topic', 'fanout'])
parser.add_argument('--noautodelete', default=True, action='store_false')
subparsers = parser.add_subparsers() subparsers = parser.add_subparsers()


parser_recv = subparsers.add_parser('recv') parser_recv = subparsers.add_parser('recv')
parser_recv.set_defaults(func=test_recv) parser_recv.set_defaults(func=test_recv)
parser_recv.add_argument('--key', default='new_cset.osmtracker') parser_recv.add_argument('--key', default='new_cset.osmtracker')
parser_recv.add_argument('--queue', default=None) parser_recv.add_argument('--queue', default=None)


parser_logs = subparsers.add_parser('logs')
parser_logs.set_defaults(func=test_logs)

args = parser.parse_args() args = parser.parse_args()
logging.getLogger('').setLevel(getattr(logging, args.log_level)) logging.getLogger('').setLevel(getattr(logging, args.log_level))


Expand Down
8 changes: 4 additions & 4 deletions osmtracker.py
Expand Up @@ -29,12 +29,12 @@


AMQP_EXCHANGE_TOPIC = 'osmtracker' # topic AMQP_EXCHANGE_TOPIC = 'osmtracker' # topic
AMQP_EXCHANGE_FANOUT = 'osmtracker_bc' # Fanout AMQP_EXCHANGE_FANOUT = 'osmtracker_bc' # Fanout
AMQP_FILTER_QUEUE = ('new_cset', 'new_cset.osmtracker', False) # name,key,auto_delete AMQP_FILTER_QUEUE = ('new_cset', 'new_cset.osmtracker', True) # name,key,auto_delete
AMQP_ANALYSIS_QUEUE = ('analysis_cset', 'analysis_cset.osmtracker', False) AMQP_ANALYSIS_QUEUE = ('analysis_cset', 'analysis_cset.osmtracker', True)
AMQP_REFRESH_QUEUE = ('refresh_cset', 'refresh_cset.osmtracker', False) AMQP_REFRESH_QUEUE = ('refresh_cset', 'refresh_cset.osmtracker', True)
AMQP_NEW_GENERATION_KEY = 'new_generation.osmtracker' AMQP_NEW_GENERATION_KEY = 'new_generation.osmtracker'
AMQP_NEW_POINTER_KEY = 'new_replication_pointer.osmtracker' AMQP_NEW_POINTER_KEY = 'new_replication_pointer.osmtracker'
AMQP_REPLICATION_POINTER_QUEUE = ('replication_pointer', AMQP_NEW_POINTER_KEY, False) AMQP_REPLICATION_POINTER_QUEUE = ('replication_pointer', AMQP_NEW_POINTER_KEY, True)
AMQP_QUEUES = [AMQP_FILTER_QUEUE, AMQP_ANALYSIS_QUEUE, AMQP_REFRESH_QUEUE] AMQP_QUEUES = [AMQP_FILTER_QUEUE, AMQP_ANALYSIS_QUEUE, AMQP_REFRESH_QUEUE]


EVENT_LABELS = ['event', 'action'] EVENT_LABELS = ['event', 'action']
Expand Down

0 comments on commit 08d8777

Please sign in to comment.