Skip to content

Commit

Permalink
Rename Spinach instance to Engine
Browse files Browse the repository at this point in the history
  • Loading branch information
NicolasLM committed Feb 24, 2018
1 parent f03189a commit f33114a
Show file tree
Hide file tree
Showing 17 changed files with 109 additions and 41 deletions.
4 changes: 2 additions & 2 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ Create task and schedule two jobs, one executed now and one later:
from datetime import datetime, timedelta, timezone
from spinach import Tasks, MemoryBroker, Spinach
from spinach import Engine, Tasks, MemoryBroker
tasks = Tasks()
Expand All @@ -43,7 +43,7 @@ Create task and schedule two jobs, one executed now and one later:
print('Computed {} + {} = {}'.format(a, b, a + b))
spin = Spinach(MemoryBroker())
spin = Engine(MemoryBroker())
spin.attach_tasks(tasks)
# Schedule a job to be executed ASAP
Expand Down
1 change: 1 addition & 0 deletions doc/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ Getting started with spinach:
user/install
user/tasks
user/jobs
user/engine
user/queues
user/integrations
user/signals
Expand Down
32 changes: 32 additions & 0 deletions doc/user/engine.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
.. _engine:

Engine
======

The Spinach :class:`Engine` is what connects tasks, jobs, brokers and workers
together.

It is possible, but unusual, to have multiple Engines running in the same
Python interpreter.

.. autoclass:: spinach.engine.Engine
:members:

Namespace
---------

When multiple Spinach Engines use the same Redis server, for example when
production and staging share the same database, different namespaces are used
to make sure they do not step on each other's feet.

The production application would contain::

spin = Engine(RedisBroker(), namespace='prod')

While the staging application would contain::

spin = Engine(RedisBroker(), namespace='stg')

.. note:: Using different Redis database numbers (0, 1, 2...) for different
environments is not enough as Redis pubsubs are shared among
databases. Namespaces solve this problem.
3 changes: 2 additions & 1 deletion doc/user/integrations.rst
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ The integration just needs to be registered before starting workers::
raven_client = Client('https://sentry_dsn/42')
register_sentry(raven_client)

spin = Spinach(MemoryBroker())
spin = Engine(MemoryBroker())
spin.start_workers()

.. autofunction:: spinach.contrib.sentry.register_sentry
6 changes: 4 additions & 2 deletions doc/user/production.rst
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,15 @@ Spinach:
- Tasks that are NOT safe to be retried have their `max_retries` set to `0`
- Tasks that are safe to be retried have their `max_retries` set to a positive
number
- Retries happen after an exponential delay with randomized jitter (the default)
- Retries happen after an exponential delay with randomized jitter (the
default)
- Task `args` and `kwargs` are JSON serializable
- The user's code is thread-safe
- Tasks do not store state in the process between invocations
- Configure logging and send exceptions to Sentry, see :doc:`integrations`
- Use different queues if tasks have different usage pattens, see :doc:`queues`
- Use different namespaces if multiple Spinach applications share the same Redis
- Use different namespaces if multiple Spinach applications share the same
Redis, see :doc:`engine`

Redis:

Expand Down
2 changes: 1 addition & 1 deletion doc/user/queues.rst
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,5 @@ workers to executing only tasks of this particular queue.
.. note:: By default all tasks and all workers use the ``spinach`` queue

.. note:: Namespaces and queues are different concepts. While queues share the
same Spinach application, namespaces make two Spinach applications
same Spinach :class:`Engine`, namespaces make two Spinach Engines
invisible to each other while still using the same broker.
19 changes: 10 additions & 9 deletions doc/user/signals.rst
Original file line number Diff line number Diff line change
Expand Up @@ -21,25 +21,26 @@ Subscribing to a signal is done via its ``connect`` decorator::
print('Job {} started'.format(job))

The first argument given to your function is always the namespace of your
Spinach, the following arguments depend on the signal itself.
Spinach :class:`Engine`, the following arguments depend on the signal itself.

Subscribing to signals of a specific Spinach
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Subscribing to signals of a specific Spinach Engine
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

As your application gets bigger you may end up running multiple ``Spinach`` in
As your application gets bigger you may end up running multiple Engines in
the same interpreter. The ``connect_via`` decorator allows to subscribe to the
signals sent by a specific ``Spinach``::
signals sent by a specific Spinach :class:`Engine`::

from spinach import Spinach, MemoryBroker, signals
from spinach import Engine, MemoryBroker, signals

foo_spin = Spinach(MemoryBroker(), namespace='foo')
bar_spin = Spinach(MemoryBroker(), namespace='bar')
foo_spin = Engine(MemoryBroker(), namespace='foo')
bar_spin = Engine(MemoryBroker(), namespace='bar')

@signals.job_started.connect_via(foo_spin.namespace)
def job_started(namespace, job, **kwargs):
print('Job {} started on Foo'.format(job))

In this example only signals sent by the `foo` ``Spinach`` will be received.
In this example only signals sent by the `foo` :class:`Engine` will be
received.

Available signals
-----------------
Expand Down
10 changes: 5 additions & 5 deletions doc/user/tasks.rst
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ To define a task::
Retries and Idempotency
-----------------------

Spinach knows two kinds of tasks: the ones that are idempotent and the ones that
are not. Since Spinach cannot guess if a task code is safe to be retried
Spinach knows two kinds of tasks: the ones that are idempotent and the ones
that are not. Since Spinach cannot guess if a task code is safe to be retried
multiple times, it must be annotated when the task is created.

Non-Idempotent Tasks
Expand Down Expand Up @@ -60,12 +60,12 @@ Idempotent tasks are defined with a positive `max_retries` value::
Tasks Registry
--------------

Before being attached to a :class:`Spinach` instance, tasks are created inside
Before being attached to a Spinach :class:`Engine`, tasks are created inside
a :class:`Tasks` registry.

This may seem cumbersome for trivial applications, like the examples in this
documentation, but there is a good reason not to directly attach tasks to a
:class:`Spinach` instance.
documentation, but there is a good reason not to directly attach tasks to an
:class:`Engine`.

Attaching tasks to a dumb :class:`Tasks` registry instead allows to compose
large applications in smaller units independent from each other, the same way a
Expand Down
4 changes: 2 additions & 2 deletions examples/queues.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import time
import logging

from spinach import Spinach, Tasks, MemoryBroker
from spinach import Engine, Tasks, MemoryBroker


logging.basicConfig(
Expand All @@ -21,7 +21,7 @@ def slow():
time.sleep(10)


spin = Spinach(MemoryBroker())
spin = Engine(MemoryBroker())
spin.attach_tasks(tasks)

tasks.schedule('slow')
Expand Down
4 changes: 2 additions & 2 deletions examples/quickstart.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from spinach import Spinach, Tasks, MemoryBroker
from spinach import Engine, Tasks, MemoryBroker

tasks = Tasks()

Expand All @@ -8,7 +8,7 @@ def compute(a, b):
print('Computed {} + {} = {}'.format(a, b, a + b))


spin = Spinach(MemoryBroker())
spin = Engine(MemoryBroker())
spin.attach_tasks(tasks)

# Schedule a job to be executed ASAP
Expand Down
2 changes: 1 addition & 1 deletion spinach/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from .brokers.memory import MemoryBroker
from .brokers.redis import RedisBroker
from .const import VERSION
from .spinach import Spinach
from .engine import Engine
from .task import Tasks

__version__ = VERSION
2 changes: 1 addition & 1 deletion spinach/contrib/sentry.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ def register_sentry(raven_client, namespace: Optional[str]=None):
:param raven_client: configured Raven client used to sent errors to Sentry
:param namespace: optionally only register the Sentry integration for a
particular Spinach instance.
particular Spinach :class:`Engine`.
"""

@signals.job_started.connect_via(namespace)
Expand Down
37 changes: 33 additions & 4 deletions spinach/spinach.py → spinach/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,14 @@
logger = getLogger(__name__)


class Spinach:
class Engine:
"""Spinach Engine coordinating a broker with workers.
:arg broker: instance of a :class:`Broker`
:arg namespace: name of the namespace used by the Engine. When different
Engines use the same Redis server, they must use different
namespaces to isolate themselves.
"""

def __init__(self, broker: Broker, namespace: str=DEFAULT_NAMESPACE):
self._broker = broker
Expand All @@ -31,9 +38,18 @@ def __init__(self, broker: Broker, namespace: str=DEFAULT_NAMESPACE):

@property
def namespace(self) -> str:
"""Namespace the Engine uses."""
return self._namespace

def attach_tasks(self, tasks: Tasks):
"""Attach a set of tasks.
A task cannot be scheduled or executed before it is attached to an
Engine.
>>> tasks = Tasks()
>>> spin.attach_tasks(tasks)
"""
self._tasks.update(tasks.tasks)
tasks._spin = self

Expand All @@ -52,19 +68,23 @@ def execute(self, task_name: str, *args, **kwargs):
def schedule(self, task_name: str, *args, **kwargs):
"""Schedule a job to be executed as soon as possible.
:arg task_name: Name to the task to execute
:arg task_name: name of the task to execute
:arg args: args to be passed to the task function
:arg kwargs: kwargs to be passed to the task function
"""
at = datetime.now(timezone.utc)
return self.schedule_at(task_name, at, *args, **kwargs)

def schedule_at(self, task_name: str, at: datetime, *args, **kwargs):
"""Schedule a job to be executed in the future.
:arg task_name: Name to the task to execute
:arg at: Date at which the job should start. It is advised to pass a
:arg task_name: name of the task to execute
:arg at: date at which the job should start. It is advised to pass a
timezone aware datetime to lift any ambiguity. However if a
timezone naive datetime if given, it will be assumed to
contain UTC time.
:arg args: args to be passed to the task function
:arg kwargs: kwargs to be passed to the task function
"""
task = self._get_task(task_name)
job = Job(task.name, task.queue, at, task.max_retries, task_args=args,
Expand Down Expand Up @@ -97,8 +117,16 @@ def _arbiter_func(self):
logger.debug('Arbiter terminated')

def start_workers(self, number: int=5, queue=DEFAULT_QUEUE, block=True):
"""Start the worker threads.
:arg number: number of worker threads to launch
:arg queue: name of the queue to consume, see :doc:`queues`
:arg block: whether to block the calling thread until a signal arrives
and workers get terminated
"""
if self._arbiter or self._workers:
raise RuntimeError('Workers can only be started once')

self._working_queue = queue

# Start the broker
Expand Down Expand Up @@ -126,6 +154,7 @@ def start_workers(self, number: int=5, queue=DEFAULT_QUEUE, block=True):
self.stop_workers()

def stop_workers(self):
"""Stop the workers and wait for them to terminate."""
self._must_stop.set()
self._workers.stop()
self._broker.stop()
Expand Down
12 changes: 7 additions & 5 deletions spinach/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ def _require_attached_tasks(self):
if self._spin is None:
raise RuntimeError(
'Cannot execute tasks until the tasks have been attached to '
'a Spinach instance.'
'a Spinach Engine.'
)

def schedule(self, task_name: str, *args, **kwargs):
Expand All @@ -130,7 +130,7 @@ def schedule(self, task_name: str, *args, **kwargs):
:arg kwargs: kwargs to be passed to the task function
This method can only be used once tasks have been attached to a
:class:`Spinach` instance.
Spinach :class:`Engine`.
"""
self._require_attached_tasks()
self._spin.schedule(task_name, *args, **kwargs)
Expand All @@ -139,13 +139,15 @@ def schedule_at(self, task_name: str, at: datetime, *args, **kwargs):
"""Schedule a job in the future
:arg task_name: name of the task to execute in the background
:arg at: datetime instance representing the date at which the job
should start
:arg at: Date at which the job should start. It is advised to pass a
timezone aware datetime to lift any ambiguity. However if a
timezone naive datetime if given, it will be assumed to
contain UTC time.
:arg args: args to be passed to the task function
:arg kwargs: kwargs to be passed to the task function
This method can only be used once tasks have been attached to a
:class:`Spinach` instance.
Spinach :class:`Engine`.
"""
self._require_attached_tasks()
self._spin.schedule_at(task_name, at, *args, **kwargs)
Expand Down
2 changes: 1 addition & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def fromtimestamp(cls, *args, **kwargs):
monkeypatch.setattr('spinach.brokers.base.datetime', MyDatetime)
monkeypatch.setattr('spinach.brokers.redis.datetime', MyDatetime)
monkeypatch.setattr('spinach.job.datetime', MyDatetime)
monkeypatch.setattr('spinach.spinach.datetime', MyDatetime)
monkeypatch.setattr('spinach.engine.datetime', MyDatetime)


def get_now() -> datetime:
Expand Down
4 changes: 2 additions & 2 deletions tests/contrib/test_sentry.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

import pytest

from spinach import Spinach, MemoryBroker, Tasks
from spinach import Engine, MemoryBroker, Tasks
from spinach.contrib.sentry import register_sentry


Expand All @@ -19,7 +19,7 @@ def fail():
def success():
return

s = Spinach(MemoryBroker(), namespace='tests')
s = Engine(MemoryBroker(), namespace='tests')
s.attach_tasks(tasks)
s.start_workers(number=1, block=False)
yield s
Expand Down
6 changes: 3 additions & 3 deletions tests/test_spinach.py → tests/test_engine.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
from datetime import datetime, timedelta, timezone
from datetime import datetime, timezone

import pytest

from spinach import Spinach, MemoryBroker
from spinach import Engine, MemoryBroker
from spinach.job import Job, JobStatus


@pytest.fixture
def spin():
s = Spinach(MemoryBroker(), namespace='tests')
s = Engine(MemoryBroker(), namespace='tests')
s.start_workers(number=1, block=False)
yield s
s.stop_workers()
Expand Down

0 comments on commit f33114a

Please sign in to comment.