Skip to content

Commit

Permalink
Move job progression logic from broker to arbiter
Browse files Browse the repository at this point in the history
  • Loading branch information
NicolasLM committed Feb 21, 2018
1 parent c84f97d commit aa47d79
Show file tree
Hide file tree
Showing 7 changed files with 72 additions and 79 deletions.
29 changes: 2 additions & 27 deletions spinach/brokers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,8 @@
from typing import Optional
import uuid

from ..job import Job, JobStatus
from ..job import Job
from ..const import WAIT_FOR_EVENT_MAX_SECONDS
from ..task import exponential_backoff
from .. import signals

logger = getLogger('spinach.broker')

Expand Down Expand Up @@ -61,31 +59,8 @@ def _to_namespaced(self, value: str) -> str:
def enqueue_job(self, job: Job):
"""Add a job to a queue."""

def job_ran(self, job: Job, err: Optional[Exception]):
"""Notification that a job has been ran (successfully or not)."""
# Todo: move this code in the arbiter
if not err:
job.status = JobStatus.SUCCEEDED
self._remove_job_from_running(job)
self._something_happened.set()
return

if job.should_retry:
job.retries += 1
job.at = (
datetime.now(timezone.utc) + exponential_backoff(job.retries)
)
signals.job_schedule_retry.send(self._namespace, job=job, err=err)
self.enqueue_job(job)
return

job.status = JobStatus.FAILED
signals.job_failed.send(self._namespace, job=job, err=err)
self._remove_job_from_running(job)
self._something_happened.set()

@abstractmethod
def _remove_job_from_running(self, job: Job):
def remove_job_from_running(self, job: Job):
"""Remove a job from the list of running ones."""

@abstractmethod
Expand Down
4 changes: 2 additions & 2 deletions spinach/brokers/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,10 @@ def flush(self):
self._queues = dict()
self._future_jobs = list()

def _remove_job_from_running(self, job: Job):
def remove_job_from_running(self, job: Job):
"""Remove a job from the list of running ones.
Easy, the memory broker doesn't track running jobs. If the broker dies
there is nothing we can do.
"""
pass
self._something_happened.set()
17 changes: 8 additions & 9 deletions spinach/brokers/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,14 +105,13 @@ def get_job_from_queue(self, queue: str) -> Optional[Job]:

return Job.deserialize(job_json_string.decode())

def _remove_job_from_running(self, job: Job):
if job.max_retries == 0:
return

self._r.hdel(
self._to_namespaced(RUNNING_JOBS_KEY.format(self._id)),
str(job.id)
)
def remove_job_from_running(self, job: Job):
if job.max_retries > 0:
self._r.hdel(
self._to_namespaced(RUNNING_JOBS_KEY.format(self._id)),
str(job.id)
)
self._something_happened.set()

def _subscriber_func(self):
logger.debug('Redis broker subscriber started')
Expand All @@ -126,7 +125,7 @@ def _subscriber_func(self):

# Consume all messages
while pub_sub.get_message(timeout=0):
pass
pass # noqa

logger.debug('Got a message from channel %s', channel_name)
self._something_happened.set()
Expand Down
23 changes: 20 additions & 3 deletions spinach/spinach.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@
import time
from typing import Optional

from .task import Task, Tasks
from .task import Task, Tasks, exponential_backoff
from .job import Job, JobStatus
from .brokers.base import Broker
from .const import DEFAULT_QUEUE, DEFAULT_NAMESPACE
from .worker import Workers
from . import signals


logger = getLogger(__name__)
Expand Down Expand Up @@ -131,5 +132,21 @@ def stop_workers(self):

def _job_finished_callback(self, job: Job, err: Optional[Exception]):
"""Function called by a worker when a job is finished."""
assert job.status is JobStatus.RUNNING
self._broker.job_ran(job, err)
if not err:
job.status = JobStatus.SUCCEEDED
self._broker.remove_job_from_running(job)
return

if job.should_retry:
job.retries += 1
job.at = (
datetime.now(timezone.utc) + exponential_backoff(job.retries)
)
signals.job_schedule_retry.send(self._namespace, job=job, err=err)
self._broker.enqueue_job(job)
# No need to remove job from running, enqueue does it
return

job.status = JobStatus.FAILED
signals.job_failed.send(self._namespace, job=job, err=err)
self._broker.remove_job_from_running(job)
20 changes: 0 additions & 20 deletions tests/test_brokers.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,26 +89,6 @@ def test_flush(broker):
assert broker.next_future_job_delta is None


def test_job_ran(broker):
now = datetime.now(timezone.utc)
job = Job('foo_task', 'foo_queue', now, 0,
task_args=(1, 2), task_kwargs={'foo': 'bar'})

job.status = JobStatus.RUNNING
broker.job_ran(job, None)
assert job.status is JobStatus.SUCCEEDED

job.status = JobStatus.RUNNING
broker.job_ran(job, RuntimeError('Error'))
assert job.status is JobStatus.FAILED

job.status = JobStatus.RUNNING
job.max_retries = 10
broker.job_ran(job, RuntimeError('Error'))
assert job.status is JobStatus.WAITING
assert job.at > now


def test_repr(broker):
assert broker.__class__.__name__ in repr(broker)
assert str(broker._id) in repr(broker)
24 changes: 6 additions & 18 deletions tests/test_redis_brokers.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,7 @@ def test_redis_flush(broker):
broker._r.delete('tests2/foo')


@patch('spinach.brokers.base.exponential_backoff')
def test_running_job(mock_eb, broker):
mock_eb.return_value = timedelta()
def test_running_job(broker):
running_jobs_key = broker._to_namespaced(
RUNNING_JOBS_KEY.format(broker._id)
)
Expand All @@ -40,6 +38,8 @@ def test_running_job(mock_eb, broker):
assert broker._r.hget(running_jobs_key, str(job.id)) is None
broker.get_job_from_queue('foo_queue')
assert broker._r.hget(running_jobs_key, str(job.id)) is None
# Try to remove it, even if it doesn't exist in running
broker.remove_job_from_running(job)

# Idempotent job - get from queue
job = Job('foo_task', 'foo_queue', datetime.now(timezone.utc), 10)
Expand All @@ -53,7 +53,8 @@ def test_running_job(mock_eb, broker):
)

# Idempotent job - re-enqueue after job ran with error
broker.job_ran(job, err=ZeroDivisionError())
job.retries += 1
broker.enqueue_job(job)
assert broker._r.hget(running_jobs_key, str(job.id)) is None
broker.get_job_from_queue('foo_queue')
job.status = JobStatus.RUNNING
Expand All @@ -63,19 +64,6 @@ def test_running_job(mock_eb, broker):
)

# Idempotent job - job succeeded
broker.job_ran(job, err=None)
broker.remove_job_from_running(job)
assert broker._r.hget(running_jobs_key, str(job.id)) is None
assert broker.get_job_from_queue('foo_queue') is None

# Idempotent job - job failed
job.status = JobStatus.NOT_SET
job.retries = 999
broker.enqueue_job(job)
job = broker.get_job_from_queue('foo_queue')
assert (
Job.deserialize(broker._r.hget(running_jobs_key, str(job.id)).decode())
== job
)
broker.job_ran(job, err=ZeroDivisionError())
assert broker._r.hget(running_jobs_key, str(job.id)) is None
assert job.status == JobStatus.FAILED
34 changes: 34 additions & 0 deletions tests/test_spinach.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from datetime import datetime, timedelta, timezone

import pytest

from spinach import Spinach, MemoryBroker
from spinach.job import Job, JobStatus


@pytest.fixture
def spin():
s = Spinach(MemoryBroker(), namespace='tests')
s.start_workers(number=1, block=False)
yield s
s.stop_workers()


def test_job_finished_callback(spin):
now = datetime.now(timezone.utc)
job = Job('foo_task', 'foo_queue', now, 0,
task_args=(1, 2), task_kwargs={'foo': 'bar'})

job.status = JobStatus.RUNNING
spin._job_finished_callback(job, None)
assert job.status is JobStatus.SUCCEEDED

job.status = JobStatus.RUNNING
spin._job_finished_callback(job, RuntimeError('Error'))
assert job.status is JobStatus.FAILED

job.status = JobStatus.RUNNING
job.max_retries = 10
spin._job_finished_callback(job, RuntimeError('Error'))
assert job.status is JobStatus.WAITING
assert job.at > now

0 comments on commit aa47d79

Please sign in to comment.