Skip to content
Permalink
Browse files

ENH: lib/bot+lib/pipeline: optimize acks and recv

bot: Do not fetch the current message again, if it is still known
pipeline: keep state if there's already a message and give errors if a
violation is seen
  • Loading branch information...
wagner-certat committed Aug 9, 2019
1 parent 2fbd378 commit 100d342c4198d46e875d9cf0be4bd57e0dff12a8
Showing with 41 additions and 10 deletions.
  1. +3 −0 CHANGELOG.md
  2. +5 −0 intelmq/lib/bot.py
  3. +33 −10 intelmq/lib/pipeline.py
@@ -17,8 +17,11 @@ CHANGELOG
- `intelmq.lib.pipeline`:
- Redis: Use single connection client if calling bot is not multithreaded. Gives a small speed advantage.
- Require the bot instance as parameter for all pipeline classes.
- New internal variable `_has_message` to keep the state of the pipeline.
- Split receive and acknowledge into public-facing and private methods.
- `intelmq.lib.bot`:
- Log message after successful bot initialization, no log message anymore for ready pipeline.
- Use existing current message if receive is called and the current message still exists.
- `intelmq.lib.test`:
- Fix the tests broker by providing the testing pipeline.
- `intelmq.lib.utils`:
@@ -498,6 +498,7 @@ def __connect_pipelines(self):
queues=self.__source_queues,
bot=self)
self.__source_pipeline.connect()
self.__current_message = None
self.logger.debug("Connected to source queue.")

if self.__destination_queues:
@@ -556,6 +557,10 @@ def send_message(self, *messages, path="_default", auto_add=None,
path_permissive=path_permissive)

def receive_message(self):
if self.__current_message:
self.logger.debug("Reusing existing current message as incoming.")
return self.__current_message

self.logger.debug('Waiting for incoming message.')
message = None
while not message:
@@ -60,6 +60,8 @@ def create(parameters: object, logger: object,

class Pipeline(object):
has_internal_queues = False
# If the class currently holds a message, restricts the actions
_has_message = False

def __init__(self, parameters, logger, bot):
self.parameters = parameters
@@ -114,9 +116,24 @@ def send(self, message, path="_default", path_permissive=False):
raise NotImplementedError

def receive(self) -> str:
if self._has_message:
raise exceptions.PipelineError("There's already a message, first "
"acknowledge the existing one.")

retval = self._receive()
self._has_message = True
return retval

def _receive(self) -> str:
raise NotImplementedError

def acknowledge(self):
if not self._has_message:
raise exceptions.PipelineError("No message to acknowledge.")
self._acknowledge()
self._has_message = False

def _acknowledge(self):
raise NotImplementedError

def clear_queue(self, queue):
@@ -205,7 +222,7 @@ def send(self, message, path="_default", path_permissive=False):
'Look at redis\'s logs.')
raise exceptions.PipelineError(exc)

def receive(self) -> str:
def _receive(self) -> str:
if self.source_queue is None:
raise exceptions.ConfigurationError('pipeline', 'No source queue given.')
try:
@@ -223,11 +240,16 @@ def receive(self) -> str:
except Exception as exc:
raise exceptions.PipelineError(exc)

def acknowledge(self):
def _acknowledge(self):
try:
return self.pipe.rpop(self.internal_queue)
retval = self.pipe.rpop(self.internal_queue)
except Exception as e:
raise exceptions.PipelineError(e)
else:
if not retval:
raise exceptions.PipelineError("Could not pop message from internal queue"
"for acknowledgement. Return value was %r."
"" % retval)

def count_queued_messages(self, *queues) -> dict:
queue_dict = {}
@@ -243,11 +265,12 @@ def clear_queue(self, queue):
which is the same as an empty list in Redis"""
try:
retval = self.pipe.delete(queue)
if retval not in (0, 1):
raise ValueError("Error on redis queue deletion: Return value was not 0 "
"or 1 but %s." % retval)
except Exception as exc:
raise exceptions.PipelineError(exc)
else:
if retval not in (0, 1):
raise exceptions.PipelineError("Error on redis queue deletion: Return value"
" was not 0 or 1 but %r." % retval)

def nonempty_queues(self) -> set:
""" Returns a list of all currently non-empty queues. """
@@ -302,7 +325,7 @@ def send(self, message, path="_default", path_permissive=False):
else:
self.state[destination_queue] = [utils.encode(message)]

def receive(self) -> str:
def _receive(self) -> str:
"""
Receives the last not yet acknowledged message.
@@ -320,7 +343,7 @@ def receive(self) -> str:

return utils.decode(first_msg)

def acknowledge(self):
def _acknowledge(self):
"""Removes a message from the internal queue and returns it"""
self.state.get(self.internal_queue, [None]).pop(0)

@@ -454,7 +477,7 @@ def send(self, message: str, path="_default", path_permissive=False) -> None:
for destination_queue in queues:
self._send(destination_queue, message)

def receive(self) -> str:
def _receive(self) -> str:
if self.source_queue is None:
raise exceptions.ConfigurationError('pipeline', 'No source queue given.')
try:
@@ -465,7 +488,7 @@ def receive(self) -> str:
except Exception as exc:
raise exceptions.PipelineError(exc)

def acknowledge(self):
def _acknowledge(self):
try:
self.channel.basic_ack(delivery_tag=self.delivery_tag)
except pika.exceptions.ConnectionClosed:

0 comments on commit 100d342

Please sign in to comment.
You can’t perform that action at this time.