Skip to content

Commit

Permalink
Fixes celery.contrib.batches
Browse files Browse the repository at this point in the history
  • Loading branch information
ask committed Oct 15, 2012
1 parent 7e5ef9e commit d4fe921
Showing 1 changed file with 28 additions and 20 deletions.
48 changes: 28 additions & 20 deletions celery/contrib/batches.py
Expand Up @@ -12,7 +12,7 @@
.. code-block:: python
from celery.task import task
from celery import task
from celery.contrib.batches import Batches
# Flush after 100 messages, or 10 seconds.
Expand Down Expand Up @@ -43,9 +43,9 @@ def count_click(requests):
from Queue import Empty, Queue

from celery.task import Task
from celery.utils import timer2
from celery.utils.log import get_logger
from celery.worker import state
from celery.worker.job import Request


logger = get_logger(__name__)
Expand Down Expand Up @@ -136,30 +136,39 @@ def __init__(self):
self._count = count(1).next
self._tref = None
self._pool = None
self._logging = None

def run(self, requests):
raise NotImplementedError('must implement run(requests)')

def flush(self, requests):
return self.apply_buffer(requests, ([SimpleRequest.from_request(r)
for r in requests], ))
def Strategy(self, task, app, consumer):
self._pool = consumer.pool
hostname = consumer.hostname
eventer = consumer.event_dispatcher
Req = Request
connection_errors = consumer.connection_errors
timer = consumer.timer
put_buffer = self._buffer.put
flush_buffer = self._do_flush

def task_message_handler(message, body, ack):
request = Req(body, on_ack=ack, app=app, hostname=hostname,
events=eventer, task=task,
connection_errors=connection_errors,
delivery_info=message.delivery_info)
put_buffer(request)

def execute(self, request, pool, loglevel, logfile):
if not self._pool: # just take pool from first task.
self._pool = pool
if not self._logging:
self._logging = loglevel, logfile
if self._tref is None: # first request starts flush timer.
self._tref = timer.apply_interval(self.flush_interval * 1000.0,
flush_buffer)

state.task_ready(request) # immediately remove from worker state.
self._buffer.put(request)
if not self._count() % self.flush_every:
flush_buffer()

if self._tref is None: # first request starts flush timer.
self._tref = timer2.apply_interval(self.flush_interval * 1000,
self._do_flush)
return task_message_handler

if not self._count() % self.flush_every:
self._do_flush()
def flush(self, requests):
return self.apply_buffer(requests, ([SimpleRequest.from_request(r)
for r in requests], ))

def _do_flush(self):
logger.debug('Batches: Wake-up to flush buffer...')
Expand All @@ -185,8 +194,7 @@ def on_accepted(pid, time_accepted):
def on_return(result):
[req.acknowledge() for req in acks_late[True]]

loglevel, logfile = self._logging
return self._pool.apply_async(apply_batches_task,
(self, args, loglevel, logfile),
(self, args, 0, None),
accept_callback=on_accepted,
callback=acks_late[True] and on_return or None)

0 comments on commit d4fe921

Please sign in to comment.