diff --git a/.gitignore b/.gitignore index 86fbfcb..5194427 100644 --- a/.gitignore +++ b/.gitignore @@ -9,3 +9,5 @@ doc/_build .coverage .pytest_cache + +tags diff --git a/spinach/brokers/base.py b/spinach/brokers/base.py index 9108a40..486b03b 100644 --- a/spinach/brokers/base.py +++ b/spinach/brokers/base.py @@ -158,7 +158,9 @@ def get_all_brokers(self) -> List[Dict[str, Union[None, str, int]]]: """Return all registered brokers.""" @abstractmethod - def enqueue_jobs_from_dead_broker(self, dead_broker_id: uuid.UUID) -> int: + def enqueue_jobs_from_dead_broker( + self, dead_broker_id: uuid.UUID + ) -> Tuple[int, list]: """Re-enqueue the jobs that were running on a broker. Only jobs that can be retired are moved back to the queue, the others diff --git a/spinach/brokers/memory.py b/spinach/brokers/memory.py index 2de21b1..ba4a641 100644 --- a/spinach/brokers/memory.py +++ b/spinach/brokers/memory.py @@ -187,9 +187,11 @@ def get_all_brokers(self) -> List[Dict[str, Union[None, str, int]]]: # A memory broker is not connected to any other broker return [self._get_broker_info()] - def enqueue_jobs_from_dead_broker(self, dead_broker_id: uuid.UUID) -> int: + def enqueue_jobs_from_dead_broker( + self, dead_broker_id: uuid.UUID + ) -> Tuple[int, list]: # A memory broker cannot be dead - return 0 + return 0, [] def remove_job_from_running(self, job: Job): """Remove a job from the list of running ones. diff --git a/spinach/brokers/redis.py b/spinach/brokers/redis.py index 1c98ef6..6fee72a 100644 --- a/spinach/brokers/redis.py +++ b/spinach/brokers/redis.py @@ -15,7 +15,7 @@ from redis.commands.core import Script from ..brokers.base import Broker -from ..job import Job, JobStatus +from ..job import Job, JobStatus, advance_job_status from ..task import Task from ..const import ( FUTURE_JOBS_KEY, NOTIFICATIONS_KEY, RUNNING_JOBS_KEY, @@ -177,14 +177,24 @@ def move_future_jobs(self) -> int: 'Worker %s on %s detected dead, re-enqueuing its jobs', dead_broker['id'], dead_broker['name'] ) - num_jobs_re_enqueued = self.enqueue_jobs_from_dead_broker( - uuid.UUID(dead_broker['id']) + num_jobs_re_enqueued, failed_jobs = ( + self.enqueue_jobs_from_dead_broker( + uuid.UUID(dead_broker['id']) + ) ) logger.warning( 'Worker %s on %s marked as dead, %d jobs were re-enqueued', dead_broker['id'], dead_broker['name'], num_jobs_re_enqueued ) + # Mark failed jobs appropriately. + jobs = [Job.deserialize(job) for job in failed_jobs] + err = Exception( + "Worker %s died and max_retries exceeded" % dead_broker['name'] + ) + for job in jobs: + advance_job_status(self.namespace, job, duration=0.0, err=err) + logger.debug("Redis moved %s job(s) from future to current queues", num_jobs_moved) return num_jobs_moved @@ -287,8 +297,10 @@ def get_all_brokers(self) -> List[Dict[str, Union[None, str, int]]]: ) return [json.loads(r.decode()) for r in rv] - def enqueue_jobs_from_dead_broker(self, dead_broker_id: uuid.UUID) -> int: - return self._run_script( + def enqueue_jobs_from_dead_broker( + self, dead_broker_id: uuid.UUID + ) -> Tuple[int, list]: + return tuple(self._run_script( self._enqueue_jobs_from_dead_broker, str(dead_broker_id), self._to_namespaced(RUNNING_JOBS_KEY.format(dead_broker_id)), @@ -298,7 +310,7 @@ def enqueue_jobs_from_dead_broker(self, dead_broker_id: uuid.UUID) -> int: self._to_namespaced(NOTIFICATIONS_KEY), self._to_namespaced(MAX_CONCURRENCY_KEY), self._to_namespaced(CURRENT_CONCURRENCY_KEY), - ) + )) def register_periodic_tasks(self, tasks: Iterable[Task]): """Register tasks that need to be scheduled periodically.""" diff --git a/spinach/brokers/redis_scripts/enqueue_jobs_from_dead_broker.lua b/spinach/brokers/redis_scripts/enqueue_jobs_from_dead_broker.lua index 4476d95..f71fa59 100644 --- a/spinach/brokers/redis_scripts/enqueue_jobs_from_dead_broker.lua +++ b/spinach/brokers/redis_scripts/enqueue_jobs_from_dead_broker.lua @@ -8,6 +8,8 @@ local max_concurrency_key = ARGV[7] local current_concurrency_key = ARGV[8] local num_enqueued_jobs = 0 +local i = 1 +local failed_jobs = {} -- Get all jobs that were running on the broker before it died local jobs_json = redis.call('hvals', running_jobs_key) @@ -16,7 +18,6 @@ for _, job_json in ipairs(jobs_json) do local job = cjson.decode(job_json) if job["retries"] < job["max_retries"] then job["retries"] = job["retries"] + 1 - -- Set job status to queued: -- A major difference between retrying a job failing in a worker and -- a failing from a dead broker is that the dead broker one is @@ -39,6 +40,10 @@ for _, job_json in ipairs(jobs_json) do local queue = string.format("%s/%s", namespace, job["queue"]) redis.call('rpush', queue, job_json) num_enqueued_jobs = num_enqueued_jobs + 1 + else + -- Keep track of jobs that exceeded the max_retries + failed_jobs[i] = job_json + i = i + 1 end redis.call('hdel', running_jobs_key, job["id"]) @@ -52,4 +57,4 @@ if num_enqueued_jobs > 0 then redis.call('publish', notifications, '') end -return num_enqueued_jobs +return {num_enqueued_jobs, failed_jobs} diff --git a/tests/functional/__init__.py b/tests/functional/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/functional/test_concurrency.py b/tests/functional/test_concurrency.py new file mode 100644 index 0000000..833d243 --- /dev/null +++ b/tests/functional/test_concurrency.py @@ -0,0 +1,31 @@ +import pytest +import time + +from spinach.brokers.memory import MemoryBroker +from spinach.brokers.redis import RedisBroker +from spinach.engine import Engine + + +@pytest.fixture(params=[MemoryBroker, RedisBroker]) +def spin(request): + broker = request.param + spin = Engine(broker(), namespace='tests') + yield spin + + +def test_concurrency_limit(spin): + count = 0 + + @spin.task(name='do_something', max_retries=10, max_concurrency=1) + def do_something(index): + nonlocal count + assert index == count + count += 1 + + for i in range(0, 5): + spin.schedule(do_something, i) + + # Start two workers; test that only one job runs at once as per the + # Task definition. + spin.start_workers(number=2, block=True, stop_when_queue_empty=True) + assert count == 5 diff --git a/tests/test_brokers.py b/tests/test_brokers.py index b9911cb..6046f3a 100644 --- a/tests/test_brokers.py +++ b/tests/test_brokers.py @@ -96,7 +96,7 @@ def test_flush(broker): def test_enqueue_jobs_from_dead_broker(broker): # Marking a broker that doesn't exist as dead broker_id = uuid.UUID('62664577-cf89-4f6a-ab16-4e20ec8fe4c2') - assert broker.enqueue_jobs_from_dead_broker(broker_id) == 0 + assert broker.enqueue_jobs_from_dead_broker(broker_id) == (0, []) def test_get_broker_info(broker): diff --git a/tests/test_redis_brokers.py b/tests/test_redis_brokers.py index 5ea7878..67679a1 100644 --- a/tests/test_redis_brokers.py +++ b/tests/test_redis_brokers.py @@ -16,6 +16,7 @@ ) from spinach.job import Job, JobStatus from spinach.task import Task +from spinach import signals CONCURRENT_TASK_NAME = 'i_am_concurrent' @@ -281,7 +282,7 @@ def test_enqueue_jobs_from_dead_broker(broker, broker_2): assert current == b'1' # Mark broker as dead, should re-enqueue only the idempotent jobs. - assert broker_2.enqueue_jobs_from_dead_broker(broker._id) == 2 + assert broker_2.enqueue_jobs_from_dead_broker(broker._id) == (2, []) # Check that the current_concurrency was decremented for job_3. current = broker._r.hget( @@ -301,11 +302,36 @@ def test_enqueue_jobs_from_dead_broker(broker, broker_2): # Check that a broker can be marked as dead multiple times # without duplicating jobs - assert broker_2.enqueue_jobs_from_dead_broker(broker._id) == 0 + assert broker_2.enqueue_jobs_from_dead_broker(broker._id) == (0, []) + + +def test_enqueue_fails_jobs_from_dead_broker_if_max_retries_exceeded( + broker, broker_2 +): + job_1 = Job('foo_task', 'foo_queue', datetime.now(timezone.utc), 1) + job_1.retries = 1 + job_2 = Job('foo_task', 'foo_queue', datetime.now(timezone.utc), 10) + broker.enqueue_jobs([job_1, job_2]) + + # Start the job. + broker.get_jobs_from_queue('foo_queue', 100) + + # Simulate a dead broker. + num_requeued, failed_jobs = broker_2.enqueue_jobs_from_dead_broker( + broker._id + ) + + # Check that one was requeued and the one marked failed is job_1. + assert num_requeued == 1 + jobs = [Job.deserialize(job.decode()) for job in failed_jobs] + job_1.status = JobStatus.RUNNING + assert [job_1] == jobs def test_detect_dead_broker(broker, broker_2): - broker_2.enqueue_jobs_from_dead_broker = Mock(return_value=10) + broker_2.enqueue_jobs_from_dead_broker = Mock( + return_value=(10, []) + ) # Register the first broker broker.move_future_jobs() @@ -321,8 +347,34 @@ def test_detect_dead_broker(broker, broker_2): ) +def test_dead_jobs_exceeding_max_retries_are_marked_failed(broker, broker_2): + job_1 = Job('foo_task', 'foo_queue', datetime.now(timezone.utc), 1) + job_1.retries = 1 + # Register the first broker + broker.move_future_jobs() + broker_2.enqueue_jobs_from_dead_broker = Mock( + return_value=(0, [job_1.serialize()]) + ) + # Set the 2nd broker to detect dead brokers after 2 seconds of inactivity + broker_2.broker_dead_threshold_seconds = 2 + time.sleep(2.1) + + signal_called = False + + @signals.job_failed.connect + def check_job(namespace, job, err, **kwargs): + nonlocal signal_called + signal_called = True + assert job.status == JobStatus.FAILED + + assert 0 == broker_2.move_future_jobs() + assert True is signal_called + + def test_not_detect_deregistered_broker_as_dead(broker, broker_2): - broker_2.enqueue_jobs_from_dead_broker = Mock(return_value=10) + broker_2.enqueue_jobs_from_dead_broker = Mock( + return_value=(10, []) + ) # Register and de-register the first broker broker.move_future_jobs() diff --git a/tox.ini b/tox.ini index 882aaba..8e83d76 100644 --- a/tox.ini +++ b/tox.ini @@ -14,6 +14,7 @@ deps = pytest-cov pytest-threadleak pycodestyle + flake8 flask django