Skip to content

Commit

Permalink
bound tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
cenkalti committed Jun 6, 2013
1 parent 12fcbb4 commit 7f969ec
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 17 deletions.
10 changes: 7 additions & 3 deletions kuyruk/kuyruk.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def __init__(self, config=None, task_class=Task):
self.config.from_object(config)

def task(self, queue='kuyruk', eager=False, retry=0, task_class=None,
max_run_time=None, local=False):
max_run_time=None, local=False, arg_class=None):
"""
Wrap functions with this decorator to convert them to background
tasks. After wrapping, calling the function will send a message to
Expand All @@ -46,6 +46,10 @@ def task(self, queue='kuyruk', eager=False, retry=0, task_class=None,
If this is :const:`None` then :attr:`Task.task_class` will be used.
:param max_run_time: Maximum allowed time in seconds for task to
complete.
:param arg_class: Class of the first argument. If it is present,
the first argument will be converted to it's ``id`` when sending the
task to the queue and it will be reloaded on worker when running
the task.
:return: Callable :class:`~Task` object wrapping the original function.
"""
Expand All @@ -57,8 +61,8 @@ def inner(f):
task_class_ = task_class or self.task_class
return task_class_(
f, self,
queue=queue_, eager=eager, local=local,
retry=retry, max_run_time=max_run_time)
queue=queue_, eager=eager, local=local, retry=retry,
max_run_time=max_run_time, arg_class=arg_class)
return inner

if callable(queue):
Expand Down
40 changes: 33 additions & 7 deletions kuyruk/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import logging
from time import time
from uuid import uuid1
from types import MethodType
from datetime import datetime
from functools import wraps
from contextlib import contextmanager
Expand All @@ -33,8 +32,8 @@ def inner(self, *args, **kwargs):

class Task(EventMixin):

def __init__(self, f, kuyruk, queue='kuyruk', local=False,
eager=False, retry=0, max_run_time=None):
def __init__(self, f, kuyruk, queue='kuyruk', local=False, eager=False,
retry=0, max_run_time=None, arg_class=None):
self.f = f
self.kuyruk = kuyruk
self.queue = queue
Expand All @@ -43,6 +42,7 @@ def __init__(self, f, kuyruk, queue='kuyruk', local=False,
self.retry = retry
self.max_run_time = max_run_time
self.cls = None
self.arg_class = arg_class
self.setup()

def setup(self):
Expand All @@ -60,15 +60,20 @@ def __call__(self, *args, **kwargs):
without changing the client code.
"""
logger.debug("type(self) = %r", type(self))
logger.debug("self.cls=%r", self.cls)
logger.debug("args=%r, kwargs=%r", args, kwargs)

self.send_signal(events.task_presend, args, kwargs, reverse=True)

task_result = TaskResult(self)

host = kwargs.pop('kuyruk_host', None)
local = kwargs.pop('kuyruk_local', None)

if self.eager or self.kuyruk.config.EAGER:
task_result.result = self.apply(*args, **kwargs)
else:
host = kwargs.pop('kuyruk_host', None)
local = kwargs.pop('kuyruk_local', None)
task_result.id = self.send_to_queue(args, kwargs,
host=host, local=local)

Expand All @@ -91,7 +96,8 @@ def __get__(self, obj, objtype):
"""
self.cls = objtype
if obj:
return MethodType(self.__call__, obj, objtype)
logger.debug("Creating bound task with obj=%r", obj)
return BoundTask(self, obj)
return self

def send_to_queue(self, args, kwargs, host=None, local=None):
Expand Down Expand Up @@ -129,7 +135,7 @@ def get_task_description(self, args, kwargs, queue):
"""Return the dictionary to be sent to the queue."""

# For class tasks; replace the first argument with the id of the object
if self.cls:
if self.cls or self.arg_class:
args = list(args)
args[0] = args[0].id

Expand Down Expand Up @@ -212,6 +218,26 @@ def class_name(self):
return self.cls.__name__


class BoundTask(Task):

def __init__(self, task, obj):
self.task = task
self.obj = obj

def __getattr__(self, item):
return getattr(self.task, item)

def __call__(self, *args, **kwargs):
args = list(args)
args.insert(0, self.obj)
return super(BoundTask, self).__call__(*args, **kwargs)

def apply(self, *args, **kwargs):
args = list(args)
args.insert(0, self.obj)
return super(BoundTask, self).apply(*args, **kwargs)


class TaskResult(object):
"""Insance of this class is returned after the task is sent to queue.
Since Kuyruk does not support a result backend yet it will raise
Expand Down
11 changes: 11 additions & 0 deletions kuyruk/test/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,23 @@ def get(cls, id):
@kuyruk.task
def meow(self, message):
print "Felix says:", message
must_be_called()

@kuyruk.task
def raise_exception(self):
raise Exception


@kuyruk.task(arg_class=Cat)
def jump(cat):
print "%s jumps high!" % cat.name
must_be_called()


def must_be_called():
print 'Yes, it is called.'


class DatabaseTask(Task):

def setup(self):
Expand Down
17 changes: 14 additions & 3 deletions kuyruk/test/test_kuyruk.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import signal
import inspect
import logging
import unittest

Expand All @@ -8,7 +7,7 @@

import tasks
from kuyruk import Task
from kuyruk.task import TaskResult
from kuyruk.task import TaskResult, BoundTask
from util import *

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -166,12 +165,18 @@ def test_extend(self):
def test_class_task(self):
cat = tasks.Cat(1, 'Felix')
self.assertTrue(isinstance(tasks.Cat.meow, Task))
self.assertTrue(inspect.ismethod(cat.meow))
self.assertTrue(isinstance(cat.meow, BoundTask))

cat.meow('Oh my god')
with run_kuyruk() as master:
master.expect('Oh my god')

@patch('kuyruk.test.tasks.must_be_called')
def test_class_task_eager(self, mock_func):
cat = tasks.Cat(1, 'Felix')
cat.meow.apply('Oh my god')
mock_func.assert_called_once_with()

def test_class_task_fail(self):
cat = tasks.Cat(1, 'Felix')

Expand All @@ -181,6 +186,12 @@ def test_class_task_fail(self):
master.expect('Saving failed task')
master.expect('Saved')

def test_class_task_function(self):
cat = tasks.Cat(1, 'Felix')
tasks.jump(cat)
with run_kuyruk() as master:
master.expect('Felix jumps high!')

def test_task_name(self):
self.assertEqual(tasks.Cat.meow.name, 'kuyruk.test.tasks:Cat.meow')
self.assertEqual(tasks.print_task.name, 'kuyruk.test.tasks:print_task')
Expand Down
11 changes: 7 additions & 4 deletions kuyruk/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,14 +247,17 @@ def apply_task(self, task, args, kwargs):
"""Imports and runs the wrapped function in task."""

# Fetch the object if class task
if task.cls:
cls = task.arg_class or task.cls
if cls:
if not args:
raise InvalidTask

if not isinstance(args[0], task.cls):
obj = task.cls.get(args[0])
if not obj:
obj_id = args[0]
if not isinstance(obj_id, cls):
obj = cls.get(obj_id)
if obj is None:
raise ObjectNotFound

args[0] = obj

result = task.apply(*args, **kwargs)
Expand Down

0 comments on commit 7f969ec

Please sign in to comment.