From f33114a66a81d20fec37736a321063b7206a464c Mon Sep 17 00:00:00 2001 From: Nicolas Le Manchet Date: Sat, 24 Feb 2018 10:14:56 +0100 Subject: [PATCH] Rename Spinach instance to Engine --- README.rst | 4 +-- doc/index.rst | 1 + doc/user/engine.rst | 32 ++++++++++++++++++++ doc/user/integrations.rst | 3 +- doc/user/production.rst | 6 ++-- doc/user/queues.rst | 2 +- doc/user/signals.rst | 19 ++++++------ doc/user/tasks.rst | 10 +++--- examples/queues.py | 4 +-- examples/quickstart.py | 4 +-- spinach/__init__.py | 2 +- spinach/contrib/sentry.py | 2 +- spinach/{spinach.py => engine.py} | 37 ++++++++++++++++++++--- spinach/task.py | 12 +++++--- tests/conftest.py | 2 +- tests/contrib/test_sentry.py | 4 +-- tests/{test_spinach.py => test_engine.py} | 6 ++-- 17 files changed, 109 insertions(+), 41 deletions(-) create mode 100644 doc/user/engine.rst rename spinach/{spinach.py => engine.py} (77%) rename tests/{test_spinach.py => test_engine.py} (84%) diff --git a/README.rst b/README.rst index d32f444..411cb19 100644 --- a/README.rst +++ b/README.rst @@ -33,7 +33,7 @@ Create task and schedule two jobs, one executed now and one later: from datetime import datetime, timedelta, timezone - from spinach import Tasks, MemoryBroker, Spinach + from spinach import Engine, Tasks, MemoryBroker tasks = Tasks() @@ -43,7 +43,7 @@ Create task and schedule two jobs, one executed now and one later: print('Computed {} + {} = {}'.format(a, b, a + b)) - spin = Spinach(MemoryBroker()) + spin = Engine(MemoryBroker()) spin.attach_tasks(tasks) # Schedule a job to be executed ASAP diff --git a/doc/index.rst b/doc/index.rst index 7125eac..a158b2d 100644 --- a/doc/index.rst +++ b/doc/index.rst @@ -29,6 +29,7 @@ Getting started with spinach: user/install user/tasks user/jobs + user/engine user/queues user/integrations user/signals diff --git a/doc/user/engine.rst b/doc/user/engine.rst new file mode 100644 index 0000000..448ec5d --- /dev/null +++ b/doc/user/engine.rst @@ -0,0 +1,32 @@ +.. _engine: + +Engine +====== + +The Spinach :class:`Engine` is what connects tasks, jobs, brokers and workers +together. + +It is possible, but unusual, to have multiple Engines running in the same +Python interpreter. + +.. autoclass:: spinach.engine.Engine + :members: + +Namespace +--------- + +When multiple Spinach Engines use the same Redis server, for example when +production and staging share the same database, different namespaces are used +to make sure they do not step on each other's feet. + +The production application would contain:: + + spin = Engine(RedisBroker(), namespace='prod') + +While the staging application would contain:: + + spin = Engine(RedisBroker(), namespace='stg') + +.. note:: Using different Redis database numbers (0, 1, 2...) for different + environments is not enough as Redis pubsubs are shared among + databases. Namespaces solve this problem. diff --git a/doc/user/integrations.rst b/doc/user/integrations.rst index cffaca6..ff1bedf 100644 --- a/doc/user/integrations.rst +++ b/doc/user/integrations.rst @@ -46,6 +46,7 @@ The integration just needs to be registered before starting workers:: raven_client = Client('https://sentry_dsn/42') register_sentry(raven_client) - spin = Spinach(MemoryBroker()) + spin = Engine(MemoryBroker()) spin.start_workers() +.. autofunction:: spinach.contrib.sentry.register_sentry diff --git a/doc/user/production.rst b/doc/user/production.rst index dba7cc7..1113bcd 100644 --- a/doc/user/production.rst +++ b/doc/user/production.rst @@ -40,13 +40,15 @@ Spinach: - Tasks that are NOT safe to be retried have their `max_retries` set to `0` - Tasks that are safe to be retried have their `max_retries` set to a positive number -- Retries happen after an exponential delay with randomized jitter (the default) +- Retries happen after an exponential delay with randomized jitter (the + default) - Task `args` and `kwargs` are JSON serializable - The user's code is thread-safe - Tasks do not store state in the process between invocations - Configure logging and send exceptions to Sentry, see :doc:`integrations` - Use different queues if tasks have different usage pattens, see :doc:`queues` -- Use different namespaces if multiple Spinach applications share the same Redis +- Use different namespaces if multiple Spinach applications share the same + Redis, see :doc:`engine` Redis: diff --git a/doc/user/queues.rst b/doc/user/queues.rst index 8146402..4a8a206 100644 --- a/doc/user/queues.rst +++ b/doc/user/queues.rst @@ -20,5 +20,5 @@ workers to executing only tasks of this particular queue. .. note:: By default all tasks and all workers use the ``spinach`` queue .. note:: Namespaces and queues are different concepts. While queues share the - same Spinach application, namespaces make two Spinach applications + same Spinach :class:`Engine`, namespaces make two Spinach Engines invisible to each other while still using the same broker. diff --git a/doc/user/signals.rst b/doc/user/signals.rst index e777243..b0ad9ef 100644 --- a/doc/user/signals.rst +++ b/doc/user/signals.rst @@ -21,25 +21,26 @@ Subscribing to a signal is done via its ``connect`` decorator:: print('Job {} started'.format(job)) The first argument given to your function is always the namespace of your -Spinach, the following arguments depend on the signal itself. +Spinach :class:`Engine`, the following arguments depend on the signal itself. -Subscribing to signals of a specific Spinach -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +Subscribing to signals of a specific Spinach Engine +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -As your application gets bigger you may end up running multiple ``Spinach`` in +As your application gets bigger you may end up running multiple Engines in the same interpreter. The ``connect_via`` decorator allows to subscribe to the -signals sent by a specific ``Spinach``:: +signals sent by a specific Spinach :class:`Engine`:: - from spinach import Spinach, MemoryBroker, signals + from spinach import Engine, MemoryBroker, signals - foo_spin = Spinach(MemoryBroker(), namespace='foo') - bar_spin = Spinach(MemoryBroker(), namespace='bar') + foo_spin = Engine(MemoryBroker(), namespace='foo') + bar_spin = Engine(MemoryBroker(), namespace='bar') @signals.job_started.connect_via(foo_spin.namespace) def job_started(namespace, job, **kwargs): print('Job {} started on Foo'.format(job)) -In this example only signals sent by the `foo` ``Spinach`` will be received. +In this example only signals sent by the `foo` :class:`Engine` will be +received. Available signals ----------------- diff --git a/doc/user/tasks.rst b/doc/user/tasks.rst index 27bf31a..17a4dfa 100644 --- a/doc/user/tasks.rst +++ b/doc/user/tasks.rst @@ -21,8 +21,8 @@ To define a task:: Retries and Idempotency ----------------------- -Spinach knows two kinds of tasks: the ones that are idempotent and the ones that -are not. Since Spinach cannot guess if a task code is safe to be retried +Spinach knows two kinds of tasks: the ones that are idempotent and the ones +that are not. Since Spinach cannot guess if a task code is safe to be retried multiple times, it must be annotated when the task is created. Non-Idempotent Tasks @@ -60,12 +60,12 @@ Idempotent tasks are defined with a positive `max_retries` value:: Tasks Registry -------------- -Before being attached to a :class:`Spinach` instance, tasks are created inside +Before being attached to a Spinach :class:`Engine`, tasks are created inside a :class:`Tasks` registry. This may seem cumbersome for trivial applications, like the examples in this -documentation, but there is a good reason not to directly attach tasks to a -:class:`Spinach` instance. +documentation, but there is a good reason not to directly attach tasks to an +:class:`Engine`. Attaching tasks to a dumb :class:`Tasks` registry instead allows to compose large applications in smaller units independent from each other, the same way a diff --git a/examples/queues.py b/examples/queues.py index 96d3649..13a1513 100644 --- a/examples/queues.py +++ b/examples/queues.py @@ -1,7 +1,7 @@ import time import logging -from spinach import Spinach, Tasks, MemoryBroker +from spinach import Engine, Tasks, MemoryBroker logging.basicConfig( @@ -21,7 +21,7 @@ def slow(): time.sleep(10) -spin = Spinach(MemoryBroker()) +spin = Engine(MemoryBroker()) spin.attach_tasks(tasks) tasks.schedule('slow') diff --git a/examples/quickstart.py b/examples/quickstart.py index 9095dfb..187b16e 100644 --- a/examples/quickstart.py +++ b/examples/quickstart.py @@ -1,4 +1,4 @@ -from spinach import Spinach, Tasks, MemoryBroker +from spinach import Engine, Tasks, MemoryBroker tasks = Tasks() @@ -8,7 +8,7 @@ def compute(a, b): print('Computed {} + {} = {}'.format(a, b, a + b)) -spin = Spinach(MemoryBroker()) +spin = Engine(MemoryBroker()) spin.attach_tasks(tasks) # Schedule a job to be executed ASAP diff --git a/spinach/__init__.py b/spinach/__init__.py index 4e68626..0bec3c4 100644 --- a/spinach/__init__.py +++ b/spinach/__init__.py @@ -1,7 +1,7 @@ from .brokers.memory import MemoryBroker from .brokers.redis import RedisBroker from .const import VERSION -from .spinach import Spinach +from .engine import Engine from .task import Tasks __version__ = VERSION diff --git a/spinach/contrib/sentry.py b/spinach/contrib/sentry.py index 89186e2..34abcf4 100644 --- a/spinach/contrib/sentry.py +++ b/spinach/contrib/sentry.py @@ -11,7 +11,7 @@ def register_sentry(raven_client, namespace: Optional[str]=None): :param raven_client: configured Raven client used to sent errors to Sentry :param namespace: optionally only register the Sentry integration for a - particular Spinach instance. + particular Spinach :class:`Engine`. """ @signals.job_started.connect_via(namespace) diff --git a/spinach/spinach.py b/spinach/engine.py similarity index 77% rename from spinach/spinach.py rename to spinach/engine.py index 49ff06f..90e200a 100644 --- a/spinach/spinach.py +++ b/spinach/engine.py @@ -15,7 +15,14 @@ logger = getLogger(__name__) -class Spinach: +class Engine: + """Spinach Engine coordinating a broker with workers. + + :arg broker: instance of a :class:`Broker` + :arg namespace: name of the namespace used by the Engine. When different + Engines use the same Redis server, they must use different + namespaces to isolate themselves. + """ def __init__(self, broker: Broker, namespace: str=DEFAULT_NAMESPACE): self._broker = broker @@ -31,9 +38,18 @@ def __init__(self, broker: Broker, namespace: str=DEFAULT_NAMESPACE): @property def namespace(self) -> str: + """Namespace the Engine uses.""" return self._namespace def attach_tasks(self, tasks: Tasks): + """Attach a set of tasks. + + A task cannot be scheduled or executed before it is attached to an + Engine. + + >>> tasks = Tasks() + >>> spin.attach_tasks(tasks) + """ self._tasks.update(tasks.tasks) tasks._spin = self @@ -52,7 +68,9 @@ def execute(self, task_name: str, *args, **kwargs): def schedule(self, task_name: str, *args, **kwargs): """Schedule a job to be executed as soon as possible. - :arg task_name: Name to the task to execute + :arg task_name: name of the task to execute + :arg args: args to be passed to the task function + :arg kwargs: kwargs to be passed to the task function """ at = datetime.now(timezone.utc) return self.schedule_at(task_name, at, *args, **kwargs) @@ -60,11 +78,13 @@ def schedule(self, task_name: str, *args, **kwargs): def schedule_at(self, task_name: str, at: datetime, *args, **kwargs): """Schedule a job to be executed in the future. - :arg task_name: Name to the task to execute - :arg at: Date at which the job should start. It is advised to pass a + :arg task_name: name of the task to execute + :arg at: date at which the job should start. It is advised to pass a timezone aware datetime to lift any ambiguity. However if a timezone naive datetime if given, it will be assumed to contain UTC time. + :arg args: args to be passed to the task function + :arg kwargs: kwargs to be passed to the task function """ task = self._get_task(task_name) job = Job(task.name, task.queue, at, task.max_retries, task_args=args, @@ -97,8 +117,16 @@ def _arbiter_func(self): logger.debug('Arbiter terminated') def start_workers(self, number: int=5, queue=DEFAULT_QUEUE, block=True): + """Start the worker threads. + + :arg number: number of worker threads to launch + :arg queue: name of the queue to consume, see :doc:`queues` + :arg block: whether to block the calling thread until a signal arrives + and workers get terminated + """ if self._arbiter or self._workers: raise RuntimeError('Workers can only be started once') + self._working_queue = queue # Start the broker @@ -126,6 +154,7 @@ def start_workers(self, number: int=5, queue=DEFAULT_QUEUE, block=True): self.stop_workers() def stop_workers(self): + """Stop the workers and wait for them to terminate.""" self._must_stop.set() self._workers.stop() self._broker.stop() diff --git a/spinach/task.py b/spinach/task.py index 00d73ce..ddd1052 100644 --- a/spinach/task.py +++ b/spinach/task.py @@ -119,7 +119,7 @@ def _require_attached_tasks(self): if self._spin is None: raise RuntimeError( 'Cannot execute tasks until the tasks have been attached to ' - 'a Spinach instance.' + 'a Spinach Engine.' ) def schedule(self, task_name: str, *args, **kwargs): @@ -130,7 +130,7 @@ def schedule(self, task_name: str, *args, **kwargs): :arg kwargs: kwargs to be passed to the task function This method can only be used once tasks have been attached to a - :class:`Spinach` instance. + Spinach :class:`Engine`. """ self._require_attached_tasks() self._spin.schedule(task_name, *args, **kwargs) @@ -139,13 +139,15 @@ def schedule_at(self, task_name: str, at: datetime, *args, **kwargs): """Schedule a job in the future :arg task_name: name of the task to execute in the background - :arg at: datetime instance representing the date at which the job - should start + :arg at: Date at which the job should start. It is advised to pass a + timezone aware datetime to lift any ambiguity. However if a + timezone naive datetime if given, it will be assumed to + contain UTC time. :arg args: args to be passed to the task function :arg kwargs: kwargs to be passed to the task function This method can only be used once tasks have been attached to a - :class:`Spinach` instance. + Spinach :class:`Engine`. """ self._require_attached_tasks() self._spin.schedule_at(task_name, at, *args, **kwargs) diff --git a/tests/conftest.py b/tests/conftest.py index e5f6e0f..48897ac 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -34,7 +34,7 @@ def fromtimestamp(cls, *args, **kwargs): monkeypatch.setattr('spinach.brokers.base.datetime', MyDatetime) monkeypatch.setattr('spinach.brokers.redis.datetime', MyDatetime) monkeypatch.setattr('spinach.job.datetime', MyDatetime) - monkeypatch.setattr('spinach.spinach.datetime', MyDatetime) + monkeypatch.setattr('spinach.engine.datetime', MyDatetime) def get_now() -> datetime: diff --git a/tests/contrib/test_sentry.py b/tests/contrib/test_sentry.py index 1b543f0..71358d3 100644 --- a/tests/contrib/test_sentry.py +++ b/tests/contrib/test_sentry.py @@ -3,7 +3,7 @@ import pytest -from spinach import Spinach, MemoryBroker, Tasks +from spinach import Engine, MemoryBroker, Tasks from spinach.contrib.sentry import register_sentry @@ -19,7 +19,7 @@ def fail(): def success(): return - s = Spinach(MemoryBroker(), namespace='tests') + s = Engine(MemoryBroker(), namespace='tests') s.attach_tasks(tasks) s.start_workers(number=1, block=False) yield s diff --git a/tests/test_spinach.py b/tests/test_engine.py similarity index 84% rename from tests/test_spinach.py rename to tests/test_engine.py index 43e35f0..1f70137 100644 --- a/tests/test_spinach.py +++ b/tests/test_engine.py @@ -1,14 +1,14 @@ -from datetime import datetime, timedelta, timezone +from datetime import datetime, timezone import pytest -from spinach import Spinach, MemoryBroker +from spinach import Engine, MemoryBroker from spinach.job import Job, JobStatus @pytest.fixture def spin(): - s = Spinach(MemoryBroker(), namespace='tests') + s = Engine(MemoryBroker(), namespace='tests') s.start_workers(number=1, block=False) yield s s.stop_workers()