Skip to content

Commit

Permalink
Code cleanup in rabbitmq backend.
Browse files Browse the repository at this point in the history
  • Loading branch information
coleifer committed Jan 6, 2015
1 parent c224a13 commit 91cd598
Showing 1 changed file with 49 additions and 37 deletions.
86 changes: 49 additions & 37 deletions huey/backends/rabbitmq_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,40 +33,49 @@ def __init__(self, name, **connection):
super(RabbitQueue, self).__init__(name, **connection)

self.queue_name = 'huey.rabbit.%s' % clean_name(name)
credentials = pika.PlainCredentials(connection.get('username', 'guest'),
connection.get('password', 'guest'))
self.conn = pika.BlockingConnection(pika.ConnectionParameters(host=connection.get('host', 'localhost'),
port=connection.get('port', 5672),
credentials=credentials,
virtual_host=connection.get('vhost', '/'),
ssl=connection.get('ssl', False)))
credentials = pika.PlainCredentials(
connection.get('username', 'guest'),
connection.get('password', 'guest'))
connection_params = pika.ConnectionParameters(
host=connection.get('host', 'localhost'),
port=connection.get('port', 5672),
credentials=credentials,
virtual_host=connection.get('vhost', '/'),
ssl=connection.get('ssl', False))

self.conn = pika.BlockingConnection(connection_params)
self.channel = self.conn.channel()
self.channel.queue_declare(self.queue_name, durable=True)

def write(self, data):
self.channel.basic_publish(exchange='',
routing_key=self.queue_name,
body=data)
self.channel.basic_publish(
exchange='',
routing_key=self.queue_name,
body=data)

def read(self):
data = self.get_data_from_queue(self.queue_name)
return data
return self.get_data_from_queue(self.queue_name)

def remove(self, data):
#This is not something you usually do in rabbit, this is the only operation, which is not atomic, but
#this "hack" should do the trick
# This is not something you usually do in rabbit, this is the only
# operation, which is not atomic, but this "hack" should do the trick.
amount = 0
idx = 0
qlen = len(self)
for method_frame, properties, body in self.channel.consume(self.queue_name, ):

for method_frame, _, body in self.channel.consume(self.queue_name):
idx += 1
if body == data:
self.channel.basic_ack(method_frame.delivery_tag)
amount += 1
else:
self.channel.basic_nack(method_frame.delivery_tag, requeue=True)
self.channel.basic_nack(
method_frame.delivery_tag,
requeue=True)

if idx >= qlen:
break

self.channel.cancel()
return amount

Expand All @@ -75,15 +84,15 @@ def flush(self):
return True

def __len__(self):
q = self.channel.queue_declare(self.queue_name, durable=True)
q_len = q.method.message_count
return q_len
queue = self.channel.queue_declare(self.queue_name, durable=True)
return queue.method.message_count

def get_data_from_queue(self, queue):
data = None
if len(self) == 0:
return None
for method_frame, properties, body in self.channel.consume(queue):

for method_frame, _, body in self.channel.consume(queue):
data = body
self.channel.basic_ack(method_frame.delivery_tag)
break
Expand All @@ -110,32 +119,35 @@ def read(self):
class RabbitEventEmitter(BaseEventEmitter):
def __init__(self, channel, **connection):
super(RabbitEventEmitter, self).__init__(channel, **connection)
credentials = pika.PlainCredentials(connection.get('username', 'guest'),
connection.get('password', 'guest'))
self.conn = pika.BlockingConnection(pika.ConnectionParameters(host=connection.get('host', 'localhost'),
port=connection.get('port', 5672),
credentials=credentials,
virtual_host=connection.get('vhost', '/'),
ssl=connection.get('ssl', False)))
credentials = pika.PlainCredentials(
connection.get('username', 'guest'),
connection.get('password', 'guest'))
connection_params = pika.ConnectionParameters(
host=connection.get('host', 'localhost'),
port=connection.get('port', 5672),
credentials=credentials,
virtual_host=connection.get('vhost', '/'),
ssl=connection.get('ssl', False))

self.conn = pika.BlockingConnection(connection_params)
self.channel = self.conn.channel()
self.exchange_name = 'huey.events'
self.channel.exchange_declare(exchange=self.exchange_name,
type='fanout',
auto_delete=False,
durable=True)
self.channel.exchange_declare(
exchange=self.exchange_name,
type='fanout',
auto_delete=False,
durable=True)

def emit(self, message):
properties = pika.BasicProperties(
content_type="text/plain",
delivery_mode=2
)
delivery_mode=2)

self.channel.basic_publish(
exchange=self.exchange_name,
routing_key='',
body=message,
properties=properties
)
properties=properties)


Components = (RabbitBlockingQueue, RabbitQueue,
RabbitEventEmitter)
Components = (RabbitBlockingQueue, None, None, RabbitEventEmitter)

0 comments on commit 91cd598

Please sign in to comment.