Skip to content

Commit

Permalink
Added batch processing for updated items.
Browse files Browse the repository at this point in the history
  • Loading branch information
Sebastien Fievet authored and toastdriven committed Sep 13, 2010
1 parent c061bed commit d256bf2
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 5 deletions.
2 changes: 1 addition & 1 deletion AUTHORS
Expand Up @@ -7,4 +7,4 @@ Primary:
Contributors:
=============

None.
* Sébastien Fievet (zyegfryed) for adding batching to the ``process_search_queue`` command.
26 changes: 22 additions & 4 deletions queued_search/management/commands/process_search_queue.py
@@ -1,4 +1,5 @@
import logging
from optparse import make_option
from queues import queues, QueueException
from django.conf import settings
from django.core.exceptions import ObjectDoesNotExist, MultipleObjectsReturned
Expand All @@ -13,6 +14,7 @@
from sets import Set as set


DEFAULT_BATCH_SIZE = getattr(settings, 'HAYSTACK_BATCH_SIZE', 1000)
LOG_LEVEL = getattr(settings, 'SEARCH_QUEUE_LOG_LEVEL', logging.ERROR)

logging.basicConfig(
Expand All @@ -22,6 +24,13 @@
class Command(NoArgsCommand):
help = "Consume any objects that have been queued for modification in search."
can_import_settings = True
base_options = (
make_option('-b', '--batch-size', action='store', dest='batchsize',
default=DEFAULT_BATCH_SIZE, type='int',
help='Number of items to index at once.'
),
)
option_list = NoArgsCommand.option_list + base_options

def __init__(self, *args, **kwargs):
super(Command, self).__init__(*args, **kwargs)
Expand All @@ -34,6 +43,7 @@ def __init__(self, *args, **kwargs):
self.processed_deletes = set()

def handle_noargs(self, **options):
self.batchsize = options.get('batchsize', DEFAULT_BATCH_SIZE)
# Setup the queue.
self.queue = queues.Queue(get_queue_name())

Expand Down Expand Up @@ -221,11 +231,19 @@ def handle_updates(self):
# Update the batch of instances for this class.
# Use the backend instead of the index because we can batch the
# instances.
current_index.backend.update(current_index, instances)
self.log.debug("Updated objects for '%s': %s" % (object_path, ", ".join(pks)))
total = len(instances)
self.log.debug("Indexing %d %s." % (total, object_path))
for start in range(0, total, self.batchsize):
end = min(start + self.batchsize, total)
batch_instances = instances[start:end]

self.log.debug(" indexing %s - %d of %d." % (start+1, end, total))
current_index.backend.update(current_index, batch_instances)

for updated in batch_instances:
self.processed_updates.add("%s.%s" % (object_path, updated.pk))

for updated in instances:
self.processed_updates.add("%s.%s" % (object_path, updated.pk))
self.log.debug("Updated objects for '%s': %s" % (object_path, ", ".join(pks)))

def handle_deletes(self):
"""
Expand Down
4 changes: 4 additions & 0 deletions tests/notes/tests.py
Expand Up @@ -280,6 +280,8 @@ def test_processing(self):
u"Saw 'delete' on 'notes.note.3'...",
u"Added 'notes.note.3' to the delete list.",
'Queue consumed.',
u'Indexing 1 notes.note.',
' indexing 1 - 1 of 1.',
u"Updated objects for 'notes.note': 2",
u"Deleted objects for 'notes.note': 1, 3",
'Processing complete.'
Expand Down Expand Up @@ -408,6 +410,8 @@ def test_requeuing(self):
"Saw 'delete' on 'notes.note.abc'...",
"Added 'notes.note.abc' to the delete list.",
'Queue consumed.',
u'Indexing 1 notes.note.',
' indexing 1 - 1 of 1.',
u"Updated objects for 'notes.note': 2",
"Exception seen during processing: Provided string 'notes.note.abc' is not a valid identifier.",
'Requeuing unprocessed messages.',
Expand Down

0 comments on commit d256bf2

Please sign in to comment.