diff --git a/spinach/engine.py b/spinach/engine.py index f1ac7b9..f37490f 100644 --- a/spinach/engine.py +++ b/spinach/engine.py @@ -10,7 +10,7 @@ from .brokers.base import Broker from .const import DEFAULT_QUEUE, DEFAULT_NAMESPACE from .worker import Workers -from . import signals +from . import signals, exc logger = getLogger(__name__) @@ -55,13 +55,14 @@ def attach_tasks(self, tasks: Tasks): tasks._spin = self def _get_task(self, name) -> Task: - try: - return self._tasks[name] - except KeyError: - raise ValueError( - 'Unknown task "{}". Known tasks: {}' - .format(name, list(self._tasks.keys())) - ) + task = self._tasks.get(name) + if task is not None: + return task + + raise exc.UnknownTask( + 'Unknown task "{}", known tasks: {}' + .format(name, list(self._tasks.keys())) + ) def execute(self, task_name: str, *args, **kwargs): return self._get_task(task_name).func(*args, **kwargs) @@ -102,8 +103,12 @@ def _arbiter_func(self): job = self._broker.get_job_from_queue(self._working_queue) while job: - job.task_func = self._get_task(job.task_name).func - self._workers.submit_job(job) + try: + job.task_func = self._get_task(job.task_name).func + except exc.UnknownTask as err: + self._job_finished_callback(job, 0.0, err) + else: + self._workers.submit_job(job) if self._workers.can_accept_job(): job = self._broker.get_job_from_queue( diff --git a/spinach/exc.py b/spinach/exc.py new file mode 100644 index 0000000..f274507 --- /dev/null +++ b/spinach/exc.py @@ -0,0 +1,6 @@ +class SpinachError(Exception): + """Base class for other Spinach exceptions.""" + + +class UnknownTask(SpinachError): + """Task name is not registered with the Engine.""" diff --git a/tests/test_engine.py b/tests/test_engine.py index b81b823..08fb33c 100644 --- a/tests/test_engine.py +++ b/tests/test_engine.py @@ -1,9 +1,12 @@ from datetime import datetime, timezone +from unittest.mock import patch +import time import pytest from spinach import Engine, MemoryBroker, RetryException from spinach.job import Job, JobStatus +from spinach.exc import UnknownTask @pytest.fixture @@ -43,3 +46,8 @@ def test_job_finished_callback(spin): job.max_retries = 0 spin._job_finished_callback(job, 1.0, RetryException('Must retry', at=now)) assert job.status is JobStatus.FAILED + + +def test_schedule_unknown_task(spin): + with pytest.raises(UnknownTask): + spin.schedule('foo_task')