Skip to content

Commit

Permalink
implement batch task sending (#52)
Browse files Browse the repository at this point in the history
  • Loading branch information
cenkalti committed Apr 15, 2017
1 parent 7b4fbd2 commit bf6ce6b
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 0 deletions.
32 changes: 32 additions & 0 deletions kuyruk/__init__.py
@@ -1,4 +1,5 @@
import sys
import json
import logging
from contextlib import contextmanager

Expand All @@ -8,6 +9,7 @@
from kuyruk.task import Task
from kuyruk.config import Config
from kuyruk.worker import Worker
from kuyruk import signals

__version__ = '8.2.0'
__all__ = ['Kuyruk', 'Config', 'Task', 'Worker']
Expand Down Expand Up @@ -108,6 +110,36 @@ def connection(self):
with _safe_close(conn):
yield conn

def send_tasks_to_queue(self, subtasks):
if self.config.EAGER:
for subtask in subtasks:
subtask.task.apply(*subtask.args, **subtask.kwargs)
return

declared_queues = set()
with self.channel() as ch:
for subtask in subtasks:
queue = subtask.task._queue_for_host(subtask.host)
if queue not in declared_queues:
ch.queue_declare(queue=queue,
durable=True, auto_delete=False)
declared_queues.add(queue)

description = subtask.task._get_description(subtask.args,
subtask.kwargs)
subtask.task._send_signal(signals.task_presend,
args=subtask.args,
kwargs=subtask.kwargs,
description=description)

body = json.dumps(description)
msg = amqp.Message(body=body)
ch.basic_publish(msg, exchange="", routing_key=queue)
subtask.task._send_signal(signals.task_postsend,
args=subtask.args,
kwargs=subtask.kwargs,
description=description)


@contextmanager
def _safe_close(obj):
Expand Down
7 changes: 7 additions & 0 deletions kuyruk/task.py
Expand Up @@ -7,6 +7,7 @@
from uuid import uuid1
from datetime import datetime
from contextlib import contextmanager
from collections import namedtuple

import amqp

Expand Down Expand Up @@ -53,6 +54,9 @@ def __call__(self, *args, **kwargs):
logger.debug("Task.__call__ args=%r, kwargs=%r", args, kwargs)
self.send_to_queue(args, kwargs)

def subtask(self, args=(), kwargs={}, host=None):
return SubTask(self, args, kwargs, host)

def send_to_queue(self, args=(), kwargs={},
host=None, wait_result=None, message_ttl=None):
"""
Expand Down Expand Up @@ -194,6 +198,9 @@ def _module_name(self):
return name


SubTask = namedtuple("SubTask", ("task", "args", "kwargs", "host"))


@contextmanager
def time_limit(seconds):
def signal_handler(signum, frame):
Expand Down
10 changes: 10 additions & 0 deletions tests/integration/test_worker.py
Expand Up @@ -215,3 +215,13 @@ def test_sigusr1(self):
worker.expect('Consumer started')
worker.send_signal(signal.SIGUSR1)
worker.expect('traceback.format_stack')

def test_batch(self):
"""Batch tasks are run"""
li = [tasks.echo.subtask(args=("foo %i" % i, )) for i in range(2)]
tasks.kuyruk.send_tasks_to_queue(li)
with run_worker() as worker:
worker.expect('Consumer started')
worker.expect('foo 0')
worker.expect('foo 1')
worker.expect('Task is processed')

0 comments on commit bf6ce6b

Please sign in to comment.