Skip to content

Commit

Permalink
Allow to define tasks in the Engine directly
Browse files Browse the repository at this point in the history
To make examples less cumbersome.
  • Loading branch information
NicolasLM committed Feb 27, 2018
1 parent 323f021 commit 670375f
Show file tree
Hide file tree
Showing 8 changed files with 93 additions and 57 deletions.
15 changes: 3 additions & 12 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -31,28 +31,19 @@ Create task and schedule two jobs, one executed now and one later:

.. code:: python
from datetime import datetime, timedelta, timezone
from spinach import Engine, MemoryBroker
from spinach import Engine, Tasks, MemoryBroker
tasks = Tasks()
spin = Engine(MemoryBroker())
@tasks.task(name='compute')
@spin.task(name='compute')
def compute(a, b):
print('Computed {} + {} = {}'.format(a, b, a + b))
spin = Engine(MemoryBroker())
spin.attach_tasks(tasks)
# Schedule a job to be executed ASAP
spin.schedule('compute', 5, 3)
# Schedule a job to be executed in 10 seconds
in_10_seconds = datetime.now(timezone.utc) + timedelta(seconds=10)
spin.schedule_at('compute', in_10_seconds, 20, 10)
print('Starting workers, ^C to quit')
spin.start_workers()
Expand Down
23 changes: 18 additions & 5 deletions doc/user/tasks.rst
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,26 @@ Tasks Registry
Before being attached to a Spinach :class:`Engine`, tasks are created inside
a :class:`Tasks` registry.

Attaching tasks to a :class:`Tasks` registry instead of directly to the
:class:`Engine` allows to compose large applications in smaller units
independent from each other, the same way a Django project is composed of many
small Django apps.

This may seem cumbersome for trivial applications, like the examples in this
documentation, but there is a good reason not to directly attach tasks to an
:class:`Engine`.
documentation or some single-module projects, so those can create tasks
directly on the :class:`Engine` using::


spin = Engine(MemoryBroker())

@spin.task(name='fast')
def fast():
time.sleep(1)

Attaching tasks to a dumb :class:`Tasks` registry instead allows to compose
large applications in smaller units independent from each other, the same way a
Django project is composed of many small Django apps.
.. note:: Creating tasks directly in the :class:`Engine` is a bit like creating
a Flask app globally instead of using an `app factory`: it works
until a change introduces a circular import. Its usage should really
be limited to tiny projects.

.. autoclass:: spinach.task.Tasks
:members:
Expand Down
17 changes: 7 additions & 10 deletions examples/queues.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,27 @@
import time
import logging

from spinach import Engine, Tasks, MemoryBroker
from spinach import Engine, MemoryBroker


logging.basicConfig(
format='%(asctime)s - %(threadName)s %(levelname)s: %(message)s',
level=logging.DEBUG
)
tasks = Tasks()
spin = Engine(MemoryBroker())


@tasks.task(name='fast', queue='high-priority')
@spin.task(name='fast', queue='high-priority')
def fast():
time.sleep(1)


@tasks.task(name='slow', queue='low-priority')
@spin.task(name='slow', queue='low-priority')
def slow():
time.sleep(10)


spin = Engine(MemoryBroker())
spin.attach_tasks(tasks)

tasks.schedule('slow')
tasks.schedule('fast')
spin.schedule('slow')
spin.schedule('fast')

spin.start_workers(number=1, queue='low-priority')
spin.start_workers(number=1, queue='high-priority')
9 changes: 3 additions & 6 deletions examples/quickstart.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,13 @@
from spinach import Engine, Tasks, MemoryBroker
from spinach import Engine, MemoryBroker

tasks = Tasks()
spin = Engine(MemoryBroker())


@tasks.task(name='compute')
@spin.task(name='compute')
def compute(a, b):
print('Computed {} + {} = {}'.format(a, b, a + b))


spin = Engine(MemoryBroker())
spin.attach_tasks(tasks)

# Schedule a job to be executed ASAP
spin.schedule('compute', 5, 3)

Expand Down
26 changes: 9 additions & 17 deletions spinach/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import time
from typing import Optional

from .task import Task, Tasks, Batch, RetryException
from .task import Tasks, Batch, RetryException
from .utils import human_duration, run_forever, exponential_backoff
from .job import Job, JobStatus
from .brokers.base import Broker
Expand All @@ -29,7 +29,9 @@ def __init__(self, broker: Broker, namespace: str=DEFAULT_NAMESPACE):
self._broker = broker
self._broker.namespace = namespace
self._namespace = namespace
self._tasks = {}

self._tasks = Tasks()
self.task = self._tasks.task

self._arbiter = None
self._workers = None
Expand All @@ -53,21 +55,11 @@ def attach_tasks(self, tasks: Tasks):
"""
if tasks._spin is not None and tasks._spin is not self:
logger.warning('Tasks already attached to a different Engine')
self._tasks.update(tasks.tasks)
self._tasks.update(tasks)
tasks._spin = self

def _get_task(self, name) -> Task:
task = self._tasks.get(name)
if task is not None:
return task

raise exc.UnknownTask(
'Unknown task "{}", known tasks: {}'
.format(name, list(self._tasks.keys()))
)

def execute(self, task_name: str, *args, **kwargs):
return self._get_task(task_name).func(*args, **kwargs)
return self._tasks.get(task_name).func(*args, **kwargs)

def schedule(self, task_name: str, *args, **kwargs):
"""Schedule a job to be executed as soon as possible.
Expand All @@ -90,7 +82,7 @@ def schedule_at(self, task_name: str, at: datetime, *args, **kwargs):
:arg args: args to be passed to the task function
:arg kwargs: kwargs to be passed to the task function
"""
task = self._get_task(task_name)
task = self._tasks.get(task_name)
job = Job(task.name, task.queue, at, task.max_retries, task_args=args,
task_kwargs=kwargs)
return self._broker.enqueue_jobs([job])
Expand All @@ -105,7 +97,7 @@ def schedule_batch(self, batch: Batch):
"""
jobs = list()
for task_name, at, args, kwargs in batch.jobs_to_create:
task = self._get_task(task_name)
task = self._tasks.get(task_name)
jobs.append(
Job(task.name, task.queue, at, task.max_retries,
task_args=args, task_kwargs=kwargs)
Expand All @@ -127,7 +119,7 @@ def _arbiter_func(self):
for job in jobs:
received_jobs += 1
try:
job.task_func = self._get_task(job.task_name).func
job.task_func = self._tasks.get(job.task_name).func
except exc.UnknownTask as err:
self._job_finished_callback(job, 0.0, err)
else:
Expand Down
21 changes: 18 additions & 3 deletions spinach/task.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
from datetime import datetime, timezone
import functools
from typing import Optional, Callable
from typing import Optional, Callable, List
from numbers import Number

from . import const
from . import const, exc


class Task:
Expand Down Expand Up @@ -47,10 +47,26 @@ def __init__(self, queue: Optional[str]=None,
self.max_retries = max_retries
self._spin = None

def update(self, tasks: 'Tasks'):
self._tasks.update(tasks.tasks)

@property
def names(self) -> List[str]:
return list(self._tasks.keys())

@property
def tasks(self) -> dict:
return self._tasks

def get(self, name: str):
task = self._tasks.get(name)
if task is not None:
return task

raise exc.UnknownTask(
'Unknown task "{}", known tasks: {}'.format(name, self.names)
)

def task(self, func: Optional[Callable]=None, name: Optional[str]=None,
queue: Optional[str]=None, max_retries: Optional[Number]=None):
"""Decorator to register a task function.
Expand All @@ -65,7 +81,6 @@ def task(self, func: Optional[Callable]=None, name: Optional[str]=None,
>>> def foo():
... pass
"""

if func is None:
return functools.partial(self.task, name=name, queue=queue,
max_retries=max_retries)
Expand Down
6 changes: 3 additions & 3 deletions tests/test_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,17 +65,17 @@ def test_attach_tasks(mock_logger, spin, spin_2):
spin.attach_tasks(tasks)
mock_logger.warning.assert_not_called()
assert tasks._spin is spin
assert spin._tasks == tasks.tasks
assert spin._tasks.tasks == tasks.tasks

spin.attach_tasks(tasks)
mock_logger.warning.assert_not_called()
assert tasks._spin is spin
assert spin._tasks == tasks.tasks
assert spin._tasks.tasks == tasks.tasks

spin_2.attach_tasks(tasks)
mock_logger.warning.assert_called_once_with(ANY)
assert tasks._spin is spin_2
assert spin_2._tasks == tasks.tasks
assert spin_2._tasks.tasks == tasks.tasks


def test_schedule_batch(patch_now):
Expand Down
33 changes: 32 additions & 1 deletion tests/test_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import pytest

from spinach.task import (Task, Tasks, Batch, RetryException)
from spinach import const
from spinach import const, exc

from .conftest import get_now

Expand Down Expand Up @@ -109,6 +109,37 @@ def foo(a, b=2):
assert foo(40, 3) == 43


def test_tasks_update():
tasks_1, tasks_2 = Tasks(), Tasks()

tasks_1.add(print, 'write_to_stdout', queue='foo_queue')
tasks_2.update(tasks_1)
assert tasks_1.tasks == tasks_2.tasks

tasks_2.add(print, 'bar')
assert tasks_1.tasks != tasks_2.tasks


def test_tasks_names():
tasks = Tasks()
assert tasks.names == []
tasks.add(print, 'foo')
tasks.add(print, 'bar')
assert sorted(tasks.names) == ['bar', 'foo']


def test_tasks_get():
tasks = Tasks()
with pytest.raises(exc.UnknownTask):
tasks.get('foo')

tasks.add(print, 'foo')
r = tasks.get('foo')
assert isinstance(r, Task)
assert r.name == 'foo'
assert r.func == print


def test_tasks_scheduling(task):
tasks = Tasks()
tasks.add(print, 'write_to_stdout', queue='foo_queue')
Expand Down

0 comments on commit 670375f

Please sign in to comment.