Skip to content

Commit

Permalink
Allow to schedule jobs in batch
Browse files Browse the repository at this point in the history
  • Loading branch information
NicolasLM committed Feb 25, 2018
1 parent 8e7965b commit 2c63800
Show file tree
Hide file tree
Showing 16 changed files with 235 additions and 65 deletions.
14 changes: 14 additions & 0 deletions doc/user/design.rst
Original file line number Diff line number Diff line change
Expand Up @@ -159,3 +159,17 @@ Spinach app though your Python code. You can read settings from environment
variables, from a file or anything else possible in Python.

It is then easy to use it to create your own entrypoint to launch the workers.

Schedule tasks in batch
-----------------------

A pattern that is used frequently with task queues is to periodically scan
all entities and schedule an individual task for each entity that needs further
work. For instance closing user accounts of member who haven't logged in in a
year.

With Celery this results in having to do as many round-trips to the broker
as there are tasks to schedule. There are some workarounds but they just move
the problem elsewhere.

Spinach supports sending tasks to the broker in batch to avoid this overhead.
6 changes: 6 additions & 0 deletions doc/user/tasks.rst
Original file line number Diff line number Diff line change
Expand Up @@ -105,3 +105,9 @@ Django project is composed of many small Django apps.

.. autoclass:: spinach.task.Tasks
:members:

Batch
-----

.. autoclass:: spinach.task.Batch
:members:
2 changes: 1 addition & 1 deletion spinach/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
from .brokers.redis import RedisBroker
from .const import VERSION
from .engine import Engine
from .task import Tasks, RetryException
from .task import Tasks, Batch, RetryException

__version__ = VERSION
6 changes: 3 additions & 3 deletions spinach/brokers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from datetime import datetime, timezone
from logging import getLogger
import threading
from typing import Optional
from typing import Optional, Iterable
import uuid

from ..job import Job
Expand Down Expand Up @@ -56,8 +56,8 @@ def _to_namespaced(self, value: str) -> str:
return '{}/{}'.format(self.namespace, value)

@abstractmethod
def enqueue_job(self, job: Job):
"""Add a job to a queue."""
def enqueue_jobs(self, jobs: Iterable[Job]):
"""Enqueue a batch of jobs."""

@abstractmethod
def remove_job_from_running(self, job: Job):
Expand Down
24 changes: 13 additions & 11 deletions spinach/brokers/memory.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from logging import getLogger
from queue import Queue, Empty
import threading
from typing import Optional
from typing import Optional, Iterable

from .base import Broker
from ..job import Job, JobStatus
Expand All @@ -28,16 +28,18 @@ def _get_queue(self, queue_name: str):
self._queues[queue_name] = queue
return queue

def enqueue_job(self, job: Job):
if job.should_start:
job.status = JobStatus.QUEUED
queue = self._get_queue(job.queue)
queue.put(job.serialize())
else:
with self._lock:
job.status = JobStatus.WAITING
self._future_jobs.append(job.serialize())
self._future_jobs.sort(key=lambda j: Job.deserialize(j).at)
def enqueue_jobs(self, jobs: Iterable[Job]):
"""Enqueue a batch of jobs."""
for job in jobs:
if job.should_start:
job.status = JobStatus.QUEUED
queue = self._get_queue(job.queue)
queue.put(job.serialize())
else:
with self._lock:
job.status = JobStatus.WAITING
self._future_jobs.append(job.serialize())
self._future_jobs.sort(key=lambda j: Job.deserialize(j).at)
self._something_happened.set()

def move_future_jobs(self) -> int:
Expand Down
34 changes: 20 additions & 14 deletions spinach/brokers/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from os import path
import socket
import threading
from typing import Optional
from typing import Optional, Iterable

from redis import StrictRedis
from redis.client import Script
Expand Down Expand Up @@ -50,28 +50,34 @@ def _run_script(self, script: Script, *args):
args = [str(self._id)] + list(args)
return script(args=args)

def enqueue_job(self, job: Job):
"""Add a job to a queue."""
if job.should_start:
job.status = JobStatus.QUEUED
def enqueue_jobs(self, jobs: Iterable[Job]):
"""Enqueue a batch of jobs."""
jobs_to_queue = list()
future_jobs = list()
for job in jobs:
if job.should_start:
job.status = JobStatus.QUEUED
jobs_to_queue.append(job.serialize())
else:
job.status = JobStatus.WAITING
future_jobs.append(job.serialize())

if jobs_to_queue:
self._run_script(
self._enqueue_job,
self._to_namespaced(job.queue),
self._to_namespaced(NOTIFICATIONS_KEY),
job.serialize(),
job.id,
self._to_namespaced(RUNNING_JOBS_KEY.format(self._id)),
self.namespace,
*jobs_to_queue
)
else:
job.status = JobStatus.WAITING

if future_jobs:
self._run_script(
self._enqueue_future_job,
self._to_namespaced(FUTURE_JOBS_KEY),
self._to_namespaced(NOTIFICATIONS_KEY),
job.at_timestamp,
job.serialize(),
job.id,
self._to_namespaced(RUNNING_JOBS_KEY.format(self._id)),
self._to_namespaced(FUTURE_JOBS_KEY),
*future_jobs
)

def move_future_jobs(self) -> int:
Expand Down
22 changes: 13 additions & 9 deletions spinach/brokers/redis_scripts/enqueue_future_job.lua
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
local broker_id = ARGV[1]
local future_jobs = ARGV[2]
local notifications = ARGV[3]
local at_timestamp = ARGV[4]
local job_json = ARGV[5]
local job_id = ARGV[6]
local running_jobs_key = ARGV[7]
local notifications = ARGV[2]
local running_jobs_key = ARGV[3]
local future_jobs = ARGV[4]
-- jobs starting at ARGV[5]

redis.call('zadd', future_jobs, at_timestamp, job_json)
redis.call('hdel', running_jobs_key, job_id)
redis.call('publish', notifications, '')

for i=5, #ARGV do
local job_json = ARGV[i]
local job = cjson.decode(job_json)
local at_timestamp = job["at"] + 1 -- approximation to avoid starting a job before its real "at" date
redis.call('zadd', future_jobs, at_timestamp, job_json)
redis.call('hdel', running_jobs_key, job["id"])
end

redis.call('publish', notifications, '')
20 changes: 12 additions & 8 deletions spinach/brokers/redis_scripts/enqueue_job.lua
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
local broker_id = ARGV[1]
local queue = ARGV[2]
local notifications = ARGV[3]
local job_json = ARGV[4]
local job_id = ARGV[5]
local running_jobs_key = ARGV[6]
local notifications = ARGV[2]
local running_jobs_key = ARGV[3]
local namespace = ARGV[4]
-- jobs starting at ARGV[5]

redis.call('rpush', queue, job_json)
redis.call('hdel', running_jobs_key, job_id)
redis.call('publish', notifications, '')
for i=5, #ARGV do
local job_json = ARGV[i]
local job = cjson.decode(job_json)
local queue = string.format("%s/%s", namespace, job["queue"])
redis.call('rpush', queue, job_json)
redis.call('hdel', running_jobs_key, job["id"])
end

redis.call('publish', notifications, '')
23 changes: 20 additions & 3 deletions spinach/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import time
from typing import Optional

from .task import Task, Tasks, RetryException, exponential_backoff
from .task import Task, Tasks, Batch, RetryException, exponential_backoff
from .utils import human_duration, run_forever
from .job import Job, JobStatus
from .brokers.base import Broker
Expand Down Expand Up @@ -91,7 +91,24 @@ def schedule_at(self, task_name: str, at: datetime, *args, **kwargs):
task = self._get_task(task_name)
job = Job(task.name, task.queue, at, task.max_retries, task_args=args,
task_kwargs=kwargs)
return self._broker.enqueue_job(job)
return self._broker.enqueue_jobs([job])

def schedule_batch(self, batch: Batch):
"""Schedule many jobs at once.
Scheduling jobs in batches allows to enqueue them fast by avoiding
round-trips to the broker.
:arg batch: :class:`Batch` instance containing jobs to schedule
"""
jobs = list()
for task_name, at, args, kwargs in batch.jobs_to_create:
task = self._get_task(task_name)
jobs.append(
Job(task.name, task.queue, at, task.max_retries,
task_args=args, task_kwargs=kwargs)
)
return self._broker.enqueue_jobs(jobs)

def _arbiter_func(self):
logger.debug('Arbiter started')
Expand Down Expand Up @@ -186,7 +203,7 @@ def _job_finished_callback(self, job: Job, duration: float,

signals.job_schedule_retry.send(self._namespace, job=job, err=err)
# No need to remove job from running, enqueue does it
self._broker.enqueue_job(job)
self._broker.enqueue_jobs([job])

log_args = (
job.retries, job.max_retries + 1, job, duration,
Expand Down
63 changes: 60 additions & 3 deletions spinach/task.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from datetime import datetime, timedelta
from datetime import datetime, timedelta, timezone
import functools
from typing import Optional, Callable
from numbers import Number
Expand Down Expand Up @@ -123,7 +123,7 @@ def _require_attached_tasks(self):
)

def schedule(self, task_name: str, *args, **kwargs):
"""Schedule a job.
"""Schedule a job to be executed as soon as possible.
:arg task_name: name of the task to execute in the background
:arg args: args to be passed to the task function
Expand All @@ -136,7 +136,7 @@ def schedule(self, task_name: str, *args, **kwargs):
self._spin.schedule(task_name, *args, **kwargs)

def schedule_at(self, task_name: str, at: datetime, *args, **kwargs):
"""Schedule a job in the future
"""Schedule a job to be executed in the future.
:arg task_name: name of the task to execute in the background
:arg at: Date at which the job should start. It is advised to pass a
Expand All @@ -152,6 +152,63 @@ def schedule_at(self, task_name: str, at: datetime, *args, **kwargs):
self._require_attached_tasks()
self._spin.schedule_at(task_name, at, *args, **kwargs)

def schedule_batch(self, batch: 'Batch'):
"""Schedule many jobs at once.
Scheduling jobs in batches allows to enqueue them fast by avoiding
round-trips to the broker.
:arg batch: :class:`Batch` instance containing jobs to schedule
"""
self._require_attached_tasks()
self._spin.schedule_batch(batch)


class Batch:
"""Container allowing to schedule many jobs at once.
Batching the scheduling of jobs allows to avoid doing many round-trips
to the broker, reducing the overhead and the chance of errors associated
with doing network calls.
In this example 100 jobs are sent to Redis in one call:
>>> batch = Batch()
>>> for i in range(100):
... batch.schedule('compute', i)
...
>>> spin.schedule_batch(batch)
Once the :class:`Batch` is passed to the :class:`Engine` it should be
disposed off and not be reused.
"""

def __init__(self):
self.jobs_to_create = list()

def schedule(self, task_name: str, *args, **kwargs):
"""Add a job to be executed ASAP to the batch.
:arg task_name: name of the task to execute in the background
:arg args: args to be passed to the task function
:arg kwargs: kwargs to be passed to the task function
"""
at = datetime.now(timezone.utc)
self.schedule_at(task_name, at, *args, **kwargs)

def schedule_at(self, task_name: str, at: datetime, *args, **kwargs):
"""Add a job to be executed in the future to the batch.
:arg task_name: name of the task to execute in the background
:arg at: Date at which the job should start. It is advised to pass a
timezone aware datetime to lift any ambiguity. However if a
timezone naive datetime if given, it will be assumed to
contain UTC time.
:arg args: args to be passed to the task function
:arg kwargs: kwargs to be passed to the task function
"""
self.jobs_to_create.append((task_name, at, args, kwargs))


def exponential_backoff(attempt: int) -> timedelta:
"""Calculate a delay to retry using an exponential backoff algorithm.
Expand Down
1 change: 1 addition & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ def fromtimestamp(cls, *args, **kwargs):
monkeypatch.setattr('spinach.brokers.redis.datetime', MyDatetime)
monkeypatch.setattr('spinach.job.datetime', MyDatetime)
monkeypatch.setattr('spinach.engine.datetime', MyDatetime)
monkeypatch.setattr('spinach.task.datetime', MyDatetime)


def get_now() -> datetime:
Expand Down
14 changes: 8 additions & 6 deletions tests/test_brokers.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def broker(request):
def test_normal_job(broker):
job = Job('foo_task', 'foo_queue', datetime.now(timezone.utc), 0,
task_args=(1, 2), task_kwargs={'foo': 'bar'})
broker.enqueue_job(job)
broker.enqueue_jobs([job])
assert job.status == JobStatus.QUEUED

job.status = JobStatus.RUNNING
Expand All @@ -39,7 +39,7 @@ def test_future_job(broker, patch_now):
job = Job('foo_task', 'foo_queue', get_now() + timedelta(minutes=10), 0,
task_args=(1, 2), task_kwargs={'foo': 'bar'})

broker.enqueue_job(job)
broker.enqueue_jobs([job])
assert job.status == JobStatus.WAITING
assert broker.get_job_from_queue('foo_queue') is None
assert broker.next_future_job_delta == 600
Expand Down Expand Up @@ -73,17 +73,19 @@ def test_wait_for_events_no_future_job(broker):
(timedelta(seconds=5), 5)
])
def test_wait_for_events_with_future_job(broker, patch_now, delta, timeout):
broker.enqueue_job(
Job('foo_task', 'foo_queue', get_now() + delta, 0)
broker.enqueue_jobs(
[Job('foo_task', 'foo_queue', get_now() + delta, 0)]
)
with patch.object(broker, '_something_happened') as mock_sh:
broker.wait_for_event()
mock_sh.wait.assert_called_once_with(timeout=timeout)


def test_flush(broker):
broker.enqueue_job(Job('t1', 'q1', get_now(), 0))
broker.enqueue_job(Job('t2', 'q2', get_now() + timedelta(seconds=10), 0))
broker.enqueue_jobs([
Job('t1', 'q1', get_now(), 0),
Job('t2', 'q2', get_now() + timedelta(seconds=10), 0)
])
broker.flush()
assert broker.get_job_from_queue('q1') is None
assert broker.next_future_job_delta is None
Expand Down
Loading

0 comments on commit 2c63800

Please sign in to comment.