Skip to content

Commit

Permalink
Failures during processing now requeue the unprocessed messages. v1.0.3
Browse files Browse the repository at this point in the history
  • Loading branch information
toastdriven committed Jan 23, 2010
1 parent 7c8f1ff commit c061bed
Show file tree
Hide file tree
Showing 4 changed files with 165 additions and 4 deletions.
2 changes: 1 addition & 1 deletion queued_search/__init__.py
Expand Up @@ -2,7 +2,7 @@


__author__ = 'Daniel Lindsley'
__version__ = (1, 0, 2)
__version__ = (1, 0, 3)


def get_queue_name():
Expand Down
37 changes: 35 additions & 2 deletions queued_search/management/commands/process_search_queue.py
Expand Up @@ -30,6 +30,8 @@ def __init__(self, *args, **kwargs):
'update': set(),
'delete': set(),
}
self.processed_updates = set()
self.processed_deletes = set()

def handle_noargs(self, **options):
# Setup the queue.
Expand All @@ -56,10 +58,37 @@ def handle_noargs(self, **options):
pass

self.log.info("Queue consumed.")
self.handle_updates()
self.handle_deletes()

try:
self.handle_updates()
self.handle_deletes()
except Exception, e:
self.log.error('Exception seen during processing: %s' % e)
self.requeue()
raise e

self.log.info("Processing complete.")

def requeue(self):
"""
On failure, requeue all unprocessed messages.
"""
self.log.error('Requeuing unprocessed messages.')
update_count = 0
delete_count = 0

for update in self.actions['update']:
if not update in self.processed_updates:
self.queue.write('update:%s' % update)
update_count += 1

for delete in self.actions['delete']:
if not delete in self.processed_deletes:
self.queue.write('delete:%s' % delete)
delete_count += 1

self.log.error('Requeued %d updates and %d deletes.' % (update_count, delete_count))

def process_message(self, message):
"""
Given a message added by the ``QueuedSearchIndex``, add it to either
Expand Down Expand Up @@ -194,6 +223,9 @@ def handle_updates(self):
# instances.
current_index.backend.update(current_index, instances)
self.log.debug("Updated objects for '%s': %s" % (object_path, ", ".join(pks)))

for updated in instances:
self.processed_updates.add("%s.%s" % (object_path, updated.pk))

def handle_deletes(self):
"""
Expand Down Expand Up @@ -234,5 +266,6 @@ def handle_deletes(self):
for obj_identifier in obj_identifiers:
current_index.remove_object(obj_identifier)
pks.append(self.split_obj_identifier(obj_identifier)[1])
self.processed_deletes.add(obj_identifier)

self.log.debug("Deleted objects for '%s': %s" % (object_path, ", ".join(pks)))
2 changes: 1 addition & 1 deletion setup.py
Expand Up @@ -4,7 +4,7 @@

setup(
name='queued_search',
version='1.0.2',
version='1.0.3',
description='A queuing setup for integration with Haystack.',
author='Daniel Lindsley',
author_email='daniel@toastdriven.com',
Expand Down
128 changes: 128 additions & 0 deletions tests/notes/tests.py
Expand Up @@ -285,3 +285,131 @@ def test_processing(self):
'Processing complete.'
])
self.assertEqual(SearchQuerySet().all().count(), 1)

def test_requeuing(self):
self.assertEqual(len(self.queue), 0)

note1 = Note.objects.create(
title='A test note',
content='Because everyone loves test data.',
author='Daniel'
)

self.assertEqual(len(self.queue), 1)

# Write a failed message.
self.queue.write('update:notes.note.abc')
self.assertEqual(len(self.queue), 2)

self.assertEqual(AssertableHandler.stowed_messages, [])

try:
# Call the command, which will fail.
call_command('process_search_queue')
self.fail("The command ran successfully, which is incorrect behavior in this case.")
except:
# We don't care that it failed. We just want to examine the state
# of things afterward.
pass

self.assertEqual(len(self.queue), 2)

# Pull the whole queue.
messages = []

try:
while True:
messages.append(self.queue.read())
except QueueException:
# We're out of queued bits.
pass

self.assertEqual(messages, [u'update:notes.note.1', 'update:notes.note.abc'])
self.assertEqual(len(self.queue), 0)

self.assertEqual(AssertableHandler.stowed_messages, [
'Starting to process the queue.',
u"Processing message 'update:notes.note.1'...",
u"Saw 'update' on 'notes.note.1'...",
u"Added 'notes.note.1' to the update list.",
"Processing message 'update:notes.note.abc'...",
"Saw 'update' on 'notes.note.abc'...",
"Added 'notes.note.abc' to the update list.",
'Queue consumed.',
"Exception seen during processing: invalid literal for int() with base 10: 'abc'",
'Requeuing unprocessed messages.',
'Requeued 2 updates and 0 deletes.'
])

# Start over.
note1 = Note.objects.create(
title='A test note',
content='Because everyone loves test data.',
author='Daniel'
)

self.assertEqual(len(self.queue), 1)

note2 = Note.objects.create(
title='Another test note',
content='Because everyone loves test data.',
author='Daniel'
)

self.assertEqual(len(self.queue), 2)

# Now delete it.
note2.delete()

# Write a failed message.
self.queue.write('delete:notes.note.abc')
self.assertEqual(len(self.queue), 4)

AssertableHandler.stowed_messages = []
self.assertEqual(AssertableHandler.stowed_messages, [])

try:
# Call the command, which will fail again.
call_command('process_search_queue')
self.fail("The command ran successfully, which is incorrect behavior in this case.")
except:
# We don't care that it failed. We just want to examine the state
# of things afterward.
pass

# Everything but the bad bit of data should have processed.
self.assertEqual(len(self.queue), 1)

# Pull the whole queue.
messages = []

try:
while True:
messages.append(self.queue.read())
except QueueException:
# We're out of queued bits.
pass

self.assertEqual(messages, ['delete:notes.note.abc'])
self.assertEqual(len(self.queue), 0)

self.assertEqual(AssertableHandler.stowed_messages, [
'Starting to process the queue.',
u"Processing message 'update:notes.note.2'...",
u"Saw 'update' on 'notes.note.2'...",
u"Added 'notes.note.2' to the update list.",
u"Processing message 'update:notes.note.3'...",
u"Saw 'update' on 'notes.note.3'...",
u"Added 'notes.note.3' to the update list.",
u"Processing message 'delete:notes.note.3'...",
u"Saw 'delete' on 'notes.note.3'...",
u"Added 'notes.note.3' to the delete list.",
"Processing message 'delete:notes.note.abc'...",
"Saw 'delete' on 'notes.note.abc'...",
"Added 'notes.note.abc' to the delete list.",
'Queue consumed.',
u"Updated objects for 'notes.note': 2",
"Exception seen during processing: Provided string 'notes.note.abc' is not a valid identifier.",
'Requeuing unprocessed messages.',
'Requeued 0 updates and 1 deletes.'
])

0 comments on commit c061bed

Please sign in to comment.